Operator with kafka Topics¶
List existing topics¶
bin/kafka-topics.sh --zookeeper localhost:2181 --list
Describe a topic¶
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic mytopic
Purge a topic¶
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --config retention.ms=1000
… wait a minute …
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic mytopic --delete-config retention.ms
Delete a topic¶
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic mytopic
Get number of messages in a topic¶
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'
Get the earliest offset still in a topic¶
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -2
Get the latest offset still in a topic¶
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -1
Consume messages with the console consumer¶
bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic mytopic --from-beginning
Get the consumer offsets for a topic¶
bin/kafka-consumer-offset-checker.sh --zookeeper=localhost:2181 --topic=mytopic --group=my_consumer_group
Read from __consumer_offsets¶
Add the following property to config/consumer.properties:
exclude.internal.topics=false
bin/kafka-console-consumer.sh --consumer.config config/consumer.properties --from-beginning --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"
Kafka Consumer Groups¶
List the consumer groups known to Kafka¶
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list (old api)
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list (new api)
View the details of a consumer group¶
bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group <group name>
kafkacat¶
Getting the last five message of a topic¶
kafkacat -C -b localhost:9092 -t mytopic -p 0 -o -5 -e
Zookeeper¶
Starting the Zookeeper Shell¶
bin/zookeeper-shell.sh localhost:2181
basic command cli zookeeper : http://www.corejavaguru.com/bigdata/zookeeper/cli
Reassign partition to broker¶
Sử dụng kafka-reassign-partitions.sh tool với 3 tùy chọn:
- generate: Đưa vào list topic cần chuyển paritition, trả về danh sách gợi ý (plan) nên đưa partition nào vào broker nào
- execute: Thực hiện reassignment dưa trên plan
- verify: Đưa vào plan, check xem quá trình reassign đã thành công hay thất bại thế nào
Bước 1: Tạo danh sách topic:
cat topics-to-move.json
{"topics": [{"topic": "foo1"},
{"topic": "foo2"}],
"version":1
}
Bước 2: tạo plan:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
{"topic":"foo1","partition":0,"replicas":[3,4]},
{"topic":"foo2","partition":2,"replicas":[1,2]},
{"topic":"foo2","partition":0,"replicas":[3,4]},
{"topic":"foo1","partition":1,"replicas":[2,3]},
{"topic":"foo2","partition":1,"replicas":[2,3]}]
}
Proposed partition reassignment configuration
{"version":1,
"partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
{"topic":"foo1","partition":0,"replicas":[5,6]},
{"topic":"foo2","partition":2,"replicas":[5,6]},
{"topic":"foo2","partition":0,"replicas":[5,6]},
{"topic":"foo1","partition":1,"replicas":[5,6]},
{"topic":"foo2","partition":1,"replicas":[5,6]}]
}
Lưu thông tin từ sau dòng Proposed... vào file expand-cluster-reassignment.json
Bước 3: Thực hiện:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Bước 4: kiểm tra:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully
Tương tự, thao tác tăng số replicate của một topic cũng làm như các bước trên.
cat increase-replication-factor.json
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
Current partition replica assignment
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
"partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
Ref¶
https://www.ibm.com/support/knowledgecenter/en/SSCVHB_1.2.0/admin/tnpi_reassign_partitions.html
https://blog.imaginea.com/how-to-rebalance-topics-in-kafka-cluster/
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools
https://kafka.apache.org/documentation/#basic_ops
https://gist.github.com/sonhmai/5b2b4455162c808c091b661aeb675625