Kafka简单使用

3,311 阅读3分钟
  1. 启动kafka.

    a. 要启动kafka,必须先启动zookeeper:

    # 启动zookeeper
    zkServer.sh start
    # 查看zookeeper启动状态
    zkServer.sh status
    

    b. 启动kafka:

    ./bin/kafka-server-start.sh config/server.properties
    
  2. 在kafka中创建topic:

    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    

    创建一个分区,且分区里面分配了一个副本,的名叫 test的topic。

  3. 查看kafka中拥有的所有的topic主题:

    ./bin/kafka-topics.sh --list --zookeeper localhost:2181
    

    localhost表示当前的机子节点创建topic,也可以是kafka集群中其它节点的ip地址

  4. 查看topic主题的详细信息:

    ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
    

    如下是topic的详情信息:

    Topic:test		PartitionCount:1	ReplicationFactor:1		Configs:
    Topic: test		Partition: 0		Leader: 0			Replicas: 0		Isr: 0
    
  5. 删除某个topic:

    ./bin/kafka-topics.sh --zookeeper 192.168.241.20:2181 --delete --topic test
    

    如果kafka的配置文件config/server.properties中,delete.topic.enable=false,这个配置为false,那么删除topic 的命令,就只是通过zookeeper对topic进行标记为marked for deletion而已没有真正的删除,如果是true则是真正的删除。

  6. 创建一个Producer来对kafka进行消息发送:

    ./bin/kafka-console-producer.sh --broker-list master:9092 --topic test
    

    在kafka集群列表的master:9092中为kafka集群消息队列创建一个控制台类型的消息生产者producer,消息的主题 类型是test

  7. 创建一个consumer消息消费者来消费kafka队列中的消息:

    ./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --from-beginning
    

    在zookeeper的master节点中创建一个kafka队列的控制台消息消费者,消费的topic主题类型是test,从topic的 partition头开始消费,即offset为0

    ./bin/kafka-console-consumer.sh --zookeeper master:2181 --topic test --consumer-property 
    group.id=group_test --from-beginning
    

    在zookeeper的master节点中创建一个kafka队列的控制台消息消费者,消费的topic主题类型是test,且有一个 消费组为group_test。

  8. 查看kafka队列中topic的partition的offset信息:

    ./bin/kafka-consumer-offset-checker.sh --zookeeper master:2181 --topic test --group group_test 
    broker-info
    

    通过kafka-consumer-offset-checker.sh脚本命令查看topic主题test的group_test消费分组的kafka broker 节点的offset相关信息

    Group           Topic		Pid 	Offset		logSize		Lag		Owner
    group_hyb_tmp   hyb		0   	15		15		0		group_hyb_tmp_master-1543144080872-eb93ebf0-0
    
    BROKER INFO
    2 -> 192.168.241.22:9092
    

    以上就是查看的offset相关信息:

    a. Group: 消费者的消费分组id

    b. Topic: kafka消息主题

    c. Pid: kafka的消息主题的partition id号

    d. Offset: partition中维护的消息偏移量首地址索引号大小

    e. logSize: kafka队列消息日志的总大小

    f. Lag: 消息数据积压个数,如果Lag长时间存在,说明Producer生产消息到kafka队列中长时间未被消费,说明消费速度慢,数据存在积压问题。

    g. BROKER INFO: kafka的broker节点信息,2表示节点的myid,后面ip为当前处理消息日志的节点

  9. 在zookeeper的客户端中查看offset信息:

    # 启动zookeeper的客户端
    ./bin/zkCli.sh
    

    通过ls /可查看根目录都有哪些文件夹:

    [zk: localhost:2181(CONNECTED) 22] ls /
    [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, config]
    

    offset信息在consumer中:

    ls /consumer
    [zk: localhost:2181(CONNECTED) 23] ls /consumers
    [group_hyb, group_hyb_tmp]
    

    消费者信息中是消费组

    [zk: localhost:2181(CONNECTED) 24] ls /consumers/group_hyb_tmp
    [ids, owners, offsets]
    [zk: localhost:2181(CONNECTED) 25] ls /consumers/group_hyb_tmp/offsets
    [hyb]
    [zk: localhost:2181(CONNECTED) 26] ls /consumers/group_hyb_tmp/offsets/hyb
    [0]
    [zk: localhost:2181(CONNECTED) 27] ls /consumers/group_hyb_tmp/offsets/hyb/0
    []
    

    上面可以看到,offsets消费组里面,offset里是topic,topic里面是partition,partition里就没文件夹了。 可以通过以下命令来查看具体offset信息: get /consumers/group_hyb_tmp/offsets/hyb/0 具体的信息是:

    16
    cZxid = 0x4000000a4
    ctime = Sun Nov 25 19:09:00 CST 2018
    mZxid = 0x4000002d5
    mtime = Sun Nov 25 19:44:00 CST 2018
    pZxid = 0x4000000a4
    cversion = 0
    dataVersion = 2
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 2
    numChildren = 0
    

    第一行的16,就是offset的大小信息。