在Docker环境下的kafka部署之一:三种基本部署

8,884 阅读3分钟

在单独container中部署使用

首先,kafka依赖zookeeper,即使只是单机使用,也得按集群的方式来配置……

所以,先下载两个官方images:

docker pull confluentinc/cp-zookeeper
docker pull confluentinc/cp-kafka

然后创建一个compose(感谢令狐提供帮助):

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    container_name: zookeeper
    mem_limit: 1024M
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka
    container_name: kafka
    mem_limit: 1024M
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_NO: 1
      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_HEAP_OPTS: "-Xmx512M -Xms16M"

因为这里是直接在服务器上测试的,为安全起见暂不绑定相关端口。

启动compose:

docker-compose up -d

现在打开两个新的终端窗口,分别用以下命令登录container:

docker exec -it kafka /bin/bash

在其中一个窗口里创建topic并运行producer:

kafka-topics --zookeeper zookeeper:2181 --create --replication-factor 1 --partitions 1 --topic kafkatest
kafka-console-producer --broker-list localhost:9092 --topic kafkatest

在另一个窗口里运行consumer:

kafka-console-consumer --bootstrap-server localhost:9092 --topic kafkatest --from-beginning

现在,在producer里输入任何内容,都会在consumer里收到。

跨container的部署

上面的配置只能在单个container里使用,不实用。这是因为kafka advertised配置在localhost上。

需要跨container访问,就需要通过docker的网络访问,要修改这个配置:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    container_name: zookeeper
    mem_limit: 1024M
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka
    container_name: kafka
    mem_limit: 1024M
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_NO: 1
      KAFKA_ADVERTISED_HOST_NAME: domain_name  # 修改
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://domain_name:9092  # 修改
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_HEAP_OPTS: "-Xmx512M -Xms16M"

改完重启docker-compose,另外,因为zookeeper重启后,topic不会被持久保存,所以重启后需要重新创建topic。

然后启动两个新的container模拟网络访问:

docker run -it --rm --link kafka:domain_name --network kafka_default --name consumer confluentinc/cp-kafka /bin/bash
docker run -it --rm --link kafka:domain_name --network kafka_default --name producer confluentinc/cp-kafka /bin/bash

注意,需要指定一下docker网络为kafka_default,这是官方image使用的默认网络。

然后分别在consumer和producer两个container里测试:

kafka-console-consumer --bootstrap-server domain_name:9092 --topic kafkatest --from-beginning
kafka-console-producer --broker-list domain_name:9092 --topic kafkatest

效果与单container一样。

从Docker网络之外访问的部署

如果需要从docker网络之外访问,就需要把端口映射到宿主机了。

同样需要修改配置,增加网络映射等:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    container_name: zookeeper
    mem_limit: 1024M
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
  kafka:
    image: confluentinc/cp-kafka
    container_name: kafka
    mem_limit: 1024M
    depends_on:
      - zookeeper
    ports:  # 增加
      - 9092:9092  # 增加
    environment:
      KAFKA_BROKER_NO: 1
      KAFKA_ADVERTISED_HOST_NAME: domain_name
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://domain_name:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_HEAP_OPTS: "-Xmx512M -Xms16M"

然后启动两个新的container模拟外部网络访问:

docker run -it --rm --add-host=domain_name:172.17.0.1 --name consumer confluentinc/cp-kafka /bin/bash
docker run -it --rm --add-host=domain_name:172.17.0.1 --name producer confluentinc/cp-kafka /bin/bash

其中172.17.0.1为docker宿主机在默认docker网络(注意不是kafka_default)里的IP,具体可以通过以下命令查看:

ip route

然后分别在consumer和producer两个container里测试:

kafka-console-consumer --bootstrap-server domain_name:9092 --topic kafkatest --from-beginning
kafka-console-producer --broker-list domain_name:9092 --topic kafkatest

这里有一个坑需要注意的是:

如果宿主机上有防火墙,需要增加一条规则,允许docker网络访问宿主机的端口,否则会连接失败。比如:

# 取得行号
iptables -L INPUT --line-num
# xx为最后一行DROP的行号,插到它前面
iptables -I INPUT xx -p tcp -m tcp -s 172.17.0.0/16 --dport 9092 -j ACCEPT

效果与前两个例子相同。