Khi mới bắt đầu đọc tài liệu về Kafka, đọc sơ qua thì log compacted topic có vẻ đơn giản, nhưng theo cách mình hiểu, nó không giải thích rõ ràng về cách Kafka lưu giữ trạng thái của các topics bên trong filesystem như thế nào cả. Đầu tháng này mình có chút thời gian để đọc và tìm hiểu kỹ hơn về tính năng này, và mình muốn chia sẻ cho các bạn biết những kiến thức tâm đắc mà mình đã đọc và hiểu về Kafka.
Too long; Don’t read
Trong bài viết này, mình sẽ trình bày về log compacted topic trong Kafka. Sau đó mình sẽ chỉ cho các bạn biết cách các Kafka giữ trạng thái của các topics bên trong filesystem như thế nào.
Yêu cầu trước khi đọc bài viết này
Mình giả sử rằng bạn đã khá quen thuộc với các khái niệm cơ bản về Apache Kafka như là broker, producer, consumer, zookeeper. Ngoài ra nếu bạn muốn xem thử các câu lệnh được thực hiện trong bài viết này, bạn cần phải chạy Broker và Zookeeper.
Log compacted topic là gì?
Log compaction là cơ chế để lưu giữ thông tin trên mỗi record chi tiết hơn, thay vì chỉ lưu giữ các record dựa trên thời gian. Ý tưởng ở đây là lựa chọn bỏ đi các records mà có nhiều bản update gần đây với cùng một primary key. Bằng cách này, log sẽ được đảm bảo chỉ có ít nhất một trạng thái cuối cùng đối với mỗi key.
Nói một cách đơn giản hơn, Kafka loại bỏ bất kỳ record nào cũ đi khi có một phiên bản mới nhất của nó với cùng key có sẵn trong partition log. Bây giờ, chúng ta sẽ xem xét partition của log compacted topic dưới đây, được gọi là latest-product-price:
Như bạn thấy ở hình đầu tiên, có hai record với key là p3. Nhưng bởi vì nó là log compacted topic, nên Kafka loại bỏ đi các record cũ hơn trong một background thread (mình sẽ nói rõ hơn ở phần tiết theo). Bây giờ, giả sử chúng ta có một producer gửi các record mới tới partition này. Producer tạo ra 3 record mới với key tương ứng là: p6, p5, p5
Một lần nữa, background thread bên trong Kafka broker loại bỏ đi các record cũ hơn với các key là p5 và p6. Lưu ý rằng compacted log bao gồm 2 phần: phần đầu và phần đuôi. Kafka đảm bảo rằng tất cả các record bên trong phần đuôi luôn có unique key bởi vì phần đuôi được quét trong chu kỳ của quá trình dọn dẹp trước đó (cleaning process). Nhưng phần đầu có thể có các key bị trùng lặp. Cho đến bây giờ, chúng ta đã biết được log compacted topic là gì rồi, tiếp theo là cách để tạo ra chúng bằng công cụ kafka-topic.
Hướng dẫn tạo log compacted topic
Tạo ra một log compacted topic (mình sẽ mô tả tất cả cấu hình chi tiết ở dòng lệnh bên dưới)
bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --topic latest-product-price --replication-factor 1 --partitions 1 --config "cleanup.policy=compact" --config "delete.retention.ms=100" --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0.01"
Tạo ra vài record
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic latest-product-price --property parse.key=true --property key.separator=:
>p3:10$
>p5:7$
>p3:11$
>p6:25$
>p6:12$
>p5:14$
>p5:17$
Chú ý: trong lệnh ở trên mình có chia tách key và value ra bởi ký tự “:”. Bây giờ sẽ consume topic.
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic latest-product-price --property print.key=true --property key.separator=: --from-beginning
p3:11$
p6:12$
p5:14$
p5:17$
Như bạn thấy các record với các keys trùng lặp đã bị loại bỏ. Record với giá trị p5:14$ không bị loại bỏ và chúng ta sẽ phân tích lý do tại sao trong phần quá trình dọn dẹp log compaction. Nhưng đầu tiên chúng ta cần phải biết cách Kafka lưu trữ message bên trong như thế nào.
Segments
Partition log là phương pháp được sử dụng để chia tải trên mỗi Kafka broker, cho phép chúng ta consume dễ dàng các message có thứ tự bên trong partition, mà không cần phải lo lắng về cách lưu trữ nội bộ bên trong Kafka ra sao. Tuy nhiên, trong thực tế, partition log được Kafka broker phân chia thành các segments. Segments là những file được lưu trữ bên trong file system (bên trong thư mục data và partition), trong đó tên của những file này kết thúc với phần mở rộng là “.log”. Trong hình ảnh bên dưới, một partition log được chia thành 3 segments như sau:
Như bạn thấy, chúng ta có một partition log – lưu giữ 7 record nằm trong 3 file segments. Offset đầu tiên của segment được gọi là base offset của segment. Tên file segment luôn bằng với giá trị base offset của nó.
Segment cuối cùng trong partition được gọi là active segment. Chỉ active segment của log mới có thể nhận các message được cung cấp mới nhất. Chúng ta sẽ xem cách Kafka hành xử thế nào với active segment trong quá trình dọn dẹp của compacted log.
Quay trở lại với ví dụ, chúng ta có thể thấy các file segment của topic partition bằng câu lệnh sau đây (giả sử rằng thư mục dữ liệu Kafka nằm ở /var/kafka/data)
ls /var/kafka/data/latest-product-price-0/
00000000000000000000.index 00000000000000000006.log
00000000000000000000.log 00000000000000000006.snapshot
00000000000000000000.timeindex 00000000000000000006.timeindex
00000000000000000005.snapshot leader-epoch-checkpoint
00000000000000000006.index
File 00000000000000000000.log và 00000000000000000006.log là những segments của partition này và file 00000000000000000006.log là active segment.
Khi nào Kafka tạo ra một segment mới? Lựa chọn đầu tiên chính là thiết lập cấu hình segment.bytes (mặc định là 1GB) trong quá trình tạo topic. Khi kích thước segment vượt quá giá trị segment.bytes đã được cấu hình, khi đó Kafka sẽ tạo ra một segment mới. Lựa chọn khác bằng cách thiết lập segment.ms như bạn đã thấy trong câu lệnh tạo topic. Với lựa chọn này, khi Kafka nhận một truy vấn tạo message, nó sẽ kiểm tra xem active segment có thời gian sửa đổi gần nhất có lâu hơn thời gian hiện tại có giá trị được thiết lập tại segment.ms hay không. Nếu lâu hơn, nó sẽ tạo ra một segment mới. Trong câu lệnh tạo topic ở trên, mình có thiết lập segment.ms = 100 để đảm bảo rằng sau mỗi 100 ms thì một segment mới sẽ được tạo ra.
Điểm thú vị ở đây là khi bạn thiết lập segment.ms = 100, khi đó bạn sẽ có nhiều segment nhỏ hơn. Sau quá trình dọn dẹp (xem ở phần tiếp theo) Kafka broker, nó sẽ gộp các non-active segment lại với nhau và tạo ra một segment mới từ chúng.
Quá trình dọn dẹp log compaction
Trong quá trình khởi động, Kafka broker tạo ra một số cleaner threads, có trách nhiệm dọn dẹp compacted logs (số lượng những threads này được cấu hình thông qua log.cleaner.threads.config). Các cleaner thread sẽ liên tục cố gắng tìm ra những filthiest log bên trong broker và sau đó dọn dẹp chúng. Đối với mỗi log, nó tính toán dirty ratio với công thức như sau:
dirty ratio = số bytes trong phần đầu / tổng số bytes trong log (đuôi + đầu)
Cleaner thread sau đó chọn ra log có dirty ratio cao nhất. Log này được gọi là filthiest log và nếu giá trị của nó lớn hơn cấu hình min.cleanable.dirty.ratio, nó sẽ được dọn dẹp. Nếu không, cleaner thread sẽ bị blocked trong một khoảng thời gian (ms) (cấu hình bởi log.cleaner.backoff.ms).
Sau khi tìm thấy filthiest log, chúng ta sẽ muốn tìm phần log nào có thể dọn dẹp được. Lưu ý rằng một số phần log không thể dọn dẹp và sẽ không được quét như sau:
- Những record bên trong active segments. Đó là lý do tại sao chúng ta vẫn thấy record bị trùng lặp key (p5:14$) trong consumer.
- Nếu bạn thiết lập cấu hình min.compaction.lag.ms lớn hơn 0, khi đó bất kỳ segment nào có record với timestamp sớm hơn thời gian được cấu hình này, sẽ không được dọn dẹp. Những segments này sẽ không được quét compacted log nữa.
Bây giờ chúng ta đã biết được những record nào sẽ được thu gọn lại. Từ record đầu tiên trong log tới record đầu tiên mà không thể được dọn dẹp. Để trình bày một cách đơn giản hơn, mình giả sử rằng tất cả các record trong phần đầu của partition có thể được dọn dẹp.
Lưu ý: chúng ta đều biết rằng mọi record trong phần đuôi của log đều có unique key, bởi các record trong phần đều đều là các record bị trùng lặp key, nên đã được loại bỏ trong chu kỳ dọn dẹp trước đó. Chúng ta chỉ có thể có các record trong phần đầu với key của chúng không phải là unique trong log. Để tìm ra các bản record có key bị trùng lặp nhanh nhất, Kafka tạo ra một bản đồ chứa các record trong phần đầu của segment. Quay trở lại ví dụ, cấu trúc offset map có dạng tương tự như sau:
Như bạn thấy, Kafka tạo ra một cấu trúc gọi là offset map cho mỗi key trong phần đầu segment, lưu giữ offset tương ứng của nó. Nếu chúng ta có các bản record bị trùng lặp key trong phần đầu segment, Kafka sẽ sử dụng offset mới nhất. Trong hình ảnh ở trên, record với key p6 tại offset 5 và p5 offset mới nhất có giá trị là 7. Bây giờ, cleaner thread kiểm tra mỗi record trong log và loại bỏ chúng nếu như có bất kỳ record nào có cùng key bên trong offset map và offset của nó khác với các mục bên trong bản đồ này.
Trong quá trình dọn dẹp compacted log, không chỉ những message bị trùng lặp key sẽ bị loại bỏ, Kafka cũng loại bỏ luôn những record mà có giá trị bằng null. Những record này được gọi là tombstone. Bạn có thể hoãn quá trình loại bỏ chúng bằng cách thiết lập cấu hình delete.retention.ms. Theo mặc định, Kafka kiểm tra timestamp sửa đổi gần nhất của segment mà chứa bản record này và nếu thời gian sửa đổi nhỏ hơn so với giá trị cấu hình, record sẽ được giữ lại.
Bây giờ log đã trở nên sạch sẽ. Sau quá trình dọn dẹp làm sạch này, chúng ta có phần đuôi và đầu segment mới! Offset cuối cùng được quét để dọn dẹp (trong ví dụ của mình record cuối cùng trong phần đầu segment cũ) là offset cuối cùng của phần đuôi segment mới.
Kafka giữ offset bắt đầu của phần đầu của segment trong một file được đặt tên cleaner-offset-checkpoint trong thư mục gốc của thư mục data. File này được dùng cho chu trình dọn dẹp log tiếp theo. Chúng ta có thể thấy file checkpoint topic có dạng như sau:
cat /var/kafka/data/cleaner-offset-checkpoint
0
1
latest-product-price 0 6
Như bạn thấy bên trên có 3 dòng. Dòng đầu tiên là phiên bản file (có thể được sử dụng cho mục đích tương thích ngươc), dòng thứ hai có giá trị 1 – chỉ ra số dòng tiếp theo sẽ được kiểm tra theo dòng này, và dòng cuối cùng chứa tên của compacted log topic, số partition và offset đầu của partition này.
Kết luận
Trong bài viết này, mình đã chỉ cho bạn biết log compacted topic là gì rồi, cách chúng được lưu trữ và cách Kafka dọn dẹp chúng theo định kỳ. Cuối cùng, mình muốn nói rằng: log compacion là tính năng cực kỳ tuyệt vời cho các tình huống sử dụng caching – nơi mà bạn chỉ muốn lưu giữ những giá trị mới nhất cho mỗi record theo thời gian thực. Giả sử bạn muốn build cache trong phần khởi tạo ứng dụng. Bạn có thể đọc dữ liệu từ compacted topic để build cache – bởi vì Kafka đọc messages một cách tuần tự, nên nó sẽ nhanh hơn nhiều so với việc sử dụng cache từ SQL database.
Xem thêm: