version: '3.5'services: zookeeper: image: wurstmeister/zookeeper ## 镜像 container_name: zookeeper ports: - "2181:2181" ## 对外暴露的端口号 kafka: image: wurstmeister/kafka ## 镜像 container_name: kafka volumes: - /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直) ports: - "9092:9092" environment: KAFKA_ADVERTISED_HOST_NAME: 192.168.3.21 ## 修改:宿主机IP KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 ## kafka运行是基于zookeeper的 KAFKA_ADVERTISED_PORT: 9092 KAFKA_LOG_RETENTION_HOURS: 120 KAFKA_MESSAGE_MAX_BYTES: 10000000 KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000 KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000 KAFKA_NUM_PARTITIONS: 3 KAFKA_DELETE_RETENTION_MS: 1000 kafka-manager: image: sheepkiller/kafka-manager ## 镜像:开源的web管理kafka集群的界面 container_name: kafka-manager environment: ZK_HOSTS: 192.168.3.21 ## 修改:宿主机IP ports: - "9009:9000" ## 暴露端口9000这个端口冲突太多
docker-compose up -d --build
http://localhost:9009
docker run -p 8080:8080 -e KAFKA_BROKERS=192.168.3.21:9092 docker.redpanda.com/redpandadata/console:latest
kafka-topics.sh --create --topic moatkon \--zookeeper zookeeper:2181 --replication-factor 1 \--partitions 3
kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic moatkon
kafka-console-producer.sh --topic=netsurfingzone-topic-1 --broker-list kafka:9092
kafka-console-consumer.sh \--bootstrap-server kafka:9092 \--from-beginning --topic moatkon
/** * p: 指定分区 * d: 消息 */ @GetMapping("/message/{p}/{d}") public String sendMessage(@PathVariable("p")Integer p,@PathVariable("d")String d) { try { kafkaTemplate.send(ApplicationConstant.TOPIC_NAME, p, UUID.randomUUID().toString(),d); } catch (Exception e) { e.printStackTrace(); return e.getMessage(); } return "message send succuessfully"; }
// 监听全部// @KafkaListener(groupId = ApplicationConstant.GROUP_ID_JSON, topics = ApplicationConstant.TOPIC_NAME, containerFactory = ApplicationConstant.KAFKA_LISTENER_CONTAINER_FACTORY)// public void receivedMessage(String message) {// logger.info("message received using Kafka listener:" + message);// } @KafkaListener(groupId = ApplicationConstant.GROUP_ID_JSON, containerFactory = ApplicationConstant.KAFKA_LISTENER_CONTAINER_FACTORY, topicPartitions = {@TopicPartition(topic = ApplicationConstant.TOPIC_NAME, partitions = {"0"})}) public void partition0(String message) { logger.info("message received using Kafka listener p0: " + message); } @KafkaListener(groupId = ApplicationConstant.GROUP_ID_JSON, containerFactory = ApplicationConstant.KAFKA_LISTENER_CONTAINER_FACTORY, topicPartitions = {@TopicPartition(topic = ApplicationConstant.TOPIC_NAME, partitions = {"1"})}) public void partition1(String message) { logger.info("message received using Kafka listener p1: " + message); }
网站当前构建日期: 2024.12.22