Kafka 生产者代码解读
问题引入
尽管Kafka官方提供了生产者代码案例,我还是觉得有必要对代码进行一次解读,并加入个人的理解。
📌 这里有一篇写的很好的博客,建议阅读c语言使用librdkafka库实现kafka的生产和消费实例(转)
发布流程
- 第1步:创建kafka客户端配置信息对象并初始化
rd_kafka_conf_t *conf; /* Temporary configuration object */
conf = rd_kafka_conf_new();
- 第2步:kafka客户端配置信息对象赋值
char errstr[512]; /* librdkafka API error reporting buffer */
const char *brokers = "192.168.1.100:9092:"; /* Argument: broker list */
const char *groupid = "pub_test";
/* 配置broker,broker可以是集群,例如:ip1:9092,ip2:9092,ip3:9092 */
if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", errstr);
return 1;
}
/* 配置groupid */
if (rd_kafka_conf_set(conf, "group.id", groupid, errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
{
fprintf(stderr, "%s\n", errstr);
rd_kafka_conf_destroy(conf);
return 1;
}
/* 配置其他属性,可以查看CONFIGURATION.md文档 */
...
- 第3步:设置消息分发(成功 or 失败)报告回调函数
static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *rkmessage, void *opaque)
{
if (rkmessage->err)
{
fprintf(stderr, "%% Message delivery failed: %s\n", rd_kafka_err2str(rkmessage->err));
}
else
{
fprintf(stderr,"%% Message delivered (%zd bytes, partition %d)\n",
rkmessage->len, rkmessage->partition);
}
/* The rkmessage is destroyed automatically by librdkafka */
}
/* dr_msg_cb()函数只能被rd_kafka_poll() and rd_kafka_flush()触发 */
rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
- 第4步:创建生产者实例和主题句柄
rd_kafka_t *rk; /* Producer instance handle */
/* 函数的第一个参数有两个选择:RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER */
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
if (!rk)
{
fprintf(stderr, "%% Failed to create new producer: %s\n",
errstr);
return 1;
}
rd_kafka_topic_t *rkt;
rkt = rd_kafka_topic_new(rk, topic, NULL);
if (!rkt)
{
fprintf(stderr, "%% Failed to create topic object: %s\n",
rd_kafka_err2str(rd_kafka_last_error()));
rd_kafka_destroy(rk);
return 1;
}
- 第5步:生产消息
while(run)
{
rd_kafka_resp_err_t err;
const char *topic = "data-time"; /* Argument: topic to produce to */
char buf[512]; /* Message value temporary buffer */
int len;
lib_system_datetime_string_get(buf); // 自己编写的获取系统日期时间函数
len = strlen(buf);
retry:
//err = rd_kafka_producev(rk, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
// RD_KAFKA_V_VALUE(buf, len), RD_KAFKA_V_OPAQUE(NULL), RD_KAFKA_V_END);
err = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buf, len, NULL, 0, NULL);
if (err)
{
fprintf(stderr, "%% Failed to produce to topic %s: %s\n", rd_kafka_topic_name(rkt), rd_kafka_err2str(rd_kafka_last_error()));
if (err == RD_KAFKA_RESP_ERR__QUEUE_FULL) // 消息队列空间已满
{
rd_kafka_poll(rk, 1000); // block for max 1000ms
goto retry; // 重新再发送消息
}
}
else
{
fprintf(stderr,"%% Enqueued message (%zd bytes) for topic %s\n", len, rd_kafka_topic_name(rkt));
}
rd_kafka_poll(rk, 0); // non-blocking
}
- 第6步:刷新队列并销毁内存
/* 刷新队列 */
rd_kafka_flush(rk, 10 * 1000 /* wait for max 10 seconds */);
/* 检查生产者后台队列是否还有消息未发送完成 */
if (rd_kafka_outq_len(rk) > 0)
{
fprintf(stderr, "%% %d message(s) were not delivered\n", rd_kafka_outq_len(rk));
}
/* 销毁内存 */
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(rk);
反思总结
kafka producer.c代码中,对于很多结构体的定义都是不可见的,结构体中的成员都是被封装的,仅给出少量的结构体和枚举的定义。
在kafka server默认配置下,允许producer在生产消息时,自动创建topic 和 partition。(经过测试)
参考引用
Kafka Examples Producer.c