Hadoop(二十一)Kafka工作原理

shihongpin / 2024-09-20 / 原文

基本概念

  • Kafka是一个分布式实时数据流平台,可独立部署在单台服务器上,也可部署在多台服务器上构成集群。它提供了发布与订阅功能,用户可以发送数据到Kafka集群中,也可以从Kafka集群中读取数据

1、代理(Broker)

  • 在Kafka集群中,一个Kafka进程(Kafka进程又称为 Kafka实例)被称为一个代理(Broker)节点,代理节点是消息队列中的一个常用概念
  • 通常,在部署分布式Kafka集群时,一台服务器上部署一个Kafka实例

2、生产者(Producer)

  • 在Kafka系统中,生产者通常被称为Producer
  • Producer将消息记录发送到Kafka集群指定的主题(Topic)中进行存储,同时生产者(Producer)也能通过自定义算法决定将消息记录发送到哪个分区(Partition)

3、消费者(Consumer)

  • 消费者(Consumer)从Kafka集群指定的主题(Topic)中读取消息记录
  • 在读取主题数据时,需要设置消费组名(Groupd),如果不设置,则Kafka消费者会默认生成一个消费组名称

4、消费者组(Consumer Group)

  • 消费者程序在读取Kafka系统主题(Topic)中的数据时,通常会使用多个线程来执行
  • 一个消费者组可以包含一个或多个消费者程序,使用多分区和多线程模式可以极大提高读取数据的效率

5、主题(Topic)

  • Kafka系统通过主题来区分不同业务类型的消息记录
  • 例如,用户登录数据存储在主题A中,用户充值记录存储在主题B中,则如果应用程序只订阅了主题A,而没有订阅主题B,那该应用程序只能读取主题A中的数据

6、分区(Partition)

  • 每一个主题(Topic)中可以有一个或者多个分区(Partition)
  • 在Kafka系统的设计思想中,分区是基于物理层面上的,不同的分区对应着不同的数据文件
  • 每个分区(Partition)内部的消息记录是有序的,每个消息都有一个连续的偏移量序号(Offset),一个分区只对应一个代理节点(Broker),一个代理节点可以管理多个分区

7、副本(Replication)

  • 在Kafka系统中,每个主题(Topic)在创建时会要求指定它的副本数,默认是1
  • 通过副本(Replication)机制来保证Kafka分布式集群数据的高可用性

8、记录(Record)

  • 被实际写入到Kafka集群并且可以被消费者应用程序读取的数据
  • 每条记录包含一个键(Key)、值(Value)和时间戳(Timestamp)

工作机制

  • Kafka作为一个消息队列系统,其核心机制就是生产消息和消费消息
  • 在Kafka基本结构中,生产者(Producer)组件和消费者(Consumer)组件互不影响,但又是必须存在的
  • 生产者(Producer)负责写入消息数据。将审计日志、服务日志、数据库、移动App日志,以及其他类型的日志主动推送到Kafka集群进行存储
  • 消费者(Consumer)负责读取消息数据。例如,通过Hadoop的应用接口、Spark的应用接口、Storm的应用接口、ElasticSearch的应用接口,以及其他自定义服务的应用接口,主动拉取Kafka集群中的消息数据

Kafka常用Shell命令

  • 查看Topic列表
./bin/kafka-topics.sh --list --zookeeper cdh-worker-1:2181/kafka
  • 查看Topic详细信息
./bin/kafka-topics.sh --describe --zookeeper cdh-worker-1:2181/kafka
  • 创建Topic
./bin/kafka-topics.sh --create --zookeeper cdh-worker-1:2181/kafka --replication-factor 3 --partitions 1 --topic test-topic
  • 删除Topic
./bin/kafka-topics.sh --delete --zookeeper cdh-worker-1:2181/kafka --topic topic-demo
  • 生产数据
./bin/kafka-console-producer.sh --broker-list kafka-1:9092 --topic test-topic
  • 消费数据
./bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic test-topic --from-beginning

JavaAPI操作

  • 生产者示例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        } else {
            System.out.println("The offset of the record we just sent is: " + metadata.offset());
        }
    }
});
producer.close();
  • 消费者示例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}
consumer.close();