kafaka

林雨禾个人博客 / 2023-05-25 / 原文

安装
docker network create app-tier --driver bridge
docker run -d --name zookeeper-server --network app-tier -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:latest
docker logs -f zookeeper #查看zookeeper容器日志
docker run -d --name kafka-server --network app-tier -p 9092:9092 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181/hyx-kafka -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://172.27.16.1:9092 bitnami/kafka:latest #注意ipzookeeper用的外部kafka-server主机ip
docker logs -f kafka

kafka-map图形化管理

docker run -d --name kafka-map --network app-tier -p 9001:8080 -e DEFAULT_USERNAME=admin -e DEFAULT_PASSWORD=admin --restart always dushixiang/kafka-map:latest

账号和密码

admin
admin

zookeeper中有存放kafka信息所以需要先启动zookeeper,但是如果zookeeper本身有相关数据那么需要清空zookeeper中的数据再启动kafka,不然可能无法使用

docker run -d --name kafka-server1 --network app-tier -p 9093:9092 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181/hyx-kafka -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://172.27.16.1:9093 bitnami/kafka:latest

参数说明

zookerper存放信息,其中project name为项目名,同名创建的kafka就是集群的其中一个broker

KAFKA_CFG_ZOOKEEPER_CONNECT=:/

consumer

set('group.id', 'myConsumerGroup'); $rk = new \RdKafka\Consumer($conf); $rk->addBrokers("127.0.0.1"); $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 100); $topicConf->set('offset.store.method', 'broker'); $topicConf->set('auto.offset.reset', 'earliest'); $topic = $rk->newTopic("quickstart-events", $topicConf); $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 120*10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } #Producer set('metadata.broker.list', 'localhost:9092'); $producer = new \RdKafka\Producer($conf); $topic = $producer->newTopic("quickstart-events"); for ($i = 0; $i < 10; $i++) { $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i"); $producer->poll(0); } for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) { $result = $producer->flush(10000); if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { break; } } if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException('Was unable to flush, messages might be lost!'); } 模式  点对点模式  一个生产者生产的数据只会被对应消费者消费  发布订阅模式  能观察到的都可以进行消费(php设计模式观察者模式) 基础原理 名称说明  Brokers  数据库节点  Topic  相当于数据库表  Partitions  Topic的分区,可以把数据插入到指定分区  一个partitions来自一个broker(服务器),一个partitions只有一个leader(127.0.0.1:9092)有多个Replicas(副本)  Replicas(副本)作用是当一台服务器崩溃,会和redis一样有个哨兵机制的概念  一个Topic的数据在同一个partitions里面是有有序的,在不同分区里面是无xu的。所以要想有序的数据需要放在同一个区。  Produce  生产者  Consume  消费者 重要命令 kafka-console-producer.sh --batch-size #生产者会把数据序列化生成到一个内存队列里面,当数据大小为batch-size或max-partition-memory-bytes将被同步到中间件组 --request-required-acks #中间件回应生产者,生产者收到答复删除内存队列消息,值0,1,-1(all)分别代表不需要回应,leader收到回应,leader跟备份都收到答复 作用  解耦  中间件做缓冲  异步处理事件 PHP rdkafka拓展 成功回调方法Conf::setDrMsgCb set('metadata.broker.list', '172.20.80.1:9092,127.0.0.1:9093'); $conf->setDrMsgCb(function ($kafka, $message) { //produce成功回调方法,message存放分区,发送信息等 if ($message->err) { print_r($kafka); print_r($message); } else { print_r('123'); print_r($kafka); print_r($message); } }); $producer = new \RdKafka\Producer($conf); $topic = $producer->newTopic("quickstart-events"); // 生产主题数据,此时消息在缓冲区中,并没有真正被推送 $topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode(['id' => 10]), null, 'woca'); $producer->poll(0); //轮询为了回调事件如:setDrMsgCb()等 for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) { // 推送消息,如果不调用此函数,消息不会被发送且会丢失 $result = $producer->flush(10000); // 推送1次即可、这边多次只是为了防止没成功 if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { // 没错误就break break; } } if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { throw new \RuntimeException('Was unable to flush, messages might be lost!'); } 高级消费 setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: // 成功会输出这个玩意 echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: echo 123; throw new \Exception($err); } }); // Configure the group.id. All consumer with the same group.id will consume // different partitions. $conf->set('group.id', 'userLoginConsumerGroup'); // Initial list of Kafka brokers $conf->set('metadata.broker.list', '172.20.80.1:9092,172.20.80.1:9093'); //earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。 //latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。 $conf->set('auto.offset.reset', 'earliest'); $conf->set('enable.auto.commit', 'false'); $consumer = new RdKafka\KafkaConsumer($conf); // Subscribe to topic 'userLogin' $consumer->subscribe([$topic_name]); // 对消费者指定分区,注意此方式不能与subscribe一同使用 //$consumer->assign([ // new RdKafka\TopicPartition($topic_name, 2), // new RdKafka\TopicPartition($topic_name, 1), //]); echo "Waiting for partition assignment... (make take some time when quickly re-joining the group after leaving it.)\n"; while (true) { $message = $consumer->consume(120 * 1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: var_dump("成功读取"); var_dump($message); $consumer->commit($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); } } 面试题 https://blog.csdn.net/wanghaiping1993/article/details/125346010