Hadoop(二十一)Kafka工作原理
基本概念
- Kafka是一个分布式实时数据流平台,可独立部署在单台服务器上,也可部署在多台服务器上构成集群。它提供了发布与订阅功能,用户可以发送数据到Kafka集群中,也可以从Kafka集群中读取数据
1、代理(Broker)
- 在Kafka集群中,一个Kafka进程(Kafka进程又称为 Kafka实例)被称为一个代理(Broker)节点,代理节点是消息队列中的一个常用概念
- 通常,在部署分布式Kafka集群时,一台服务器上部署一个Kafka实例
2、生产者(Producer)
- 在Kafka系统中,生产者通常被称为Producer
- Producer将消息记录发送到Kafka集群指定的主题(Topic)中进行存储,同时生产者(Producer)也能通过自定义算法决定将消息记录发送到哪个分区(Partition)
3、消费者(Consumer)
- 消费者(Consumer)从Kafka集群指定的主题(Topic)中读取消息记录
- 在读取主题数据时,需要设置消费组名(Groupd),如果不设置,则Kafka消费者会默认生成一个消费组名称
4、消费者组(Consumer Group)
- 消费者程序在读取Kafka系统主题(Topic)中的数据时,通常会使用多个线程来执行
- 一个消费者组可以包含一个或多个消费者程序,使用多分区和多线程模式可以极大提高读取数据的效率
5、主题(Topic)
- Kafka系统通过主题来区分不同业务类型的消息记录
- 例如,用户登录数据存储在主题A中,用户充值记录存储在主题B中,则如果应用程序只订阅了主题A,而没有订阅主题B,那该应用程序只能读取主题A中的数据
6、分区(Partition)
- 每一个主题(Topic)中可以有一个或者多个分区(Partition)
- 在Kafka系统的设计思想中,分区是基于物理层面上的,不同的分区对应着不同的数据文件
- 每个分区(Partition)内部的消息记录是有序的,每个消息都有一个连续的偏移量序号(Offset),一个分区只对应一个代理节点(Broker),一个代理节点可以管理多个分区
7、副本(Replication)
- 在Kafka系统中,每个主题(Topic)在创建时会要求指定它的副本数,默认是1
- 通过副本(Replication)机制来保证Kafka分布式集群数据的高可用性
8、记录(Record)
- 被实际写入到Kafka集群并且可以被消费者应用程序读取的数据
- 每条记录包含一个键(Key)、值(Value)和时间戳(Timestamp)
工作机制
- Kafka作为一个消息队列系统,其核心机制就是生产消息和消费消息
- 在Kafka基本结构中,生产者(Producer)组件和消费者(Consumer)组件互不影响,但又是必须存在的
- 生产者(Producer)负责写入消息数据。将审计日志、服务日志、数据库、移动App日志,以及其他类型的日志主动推送到Kafka集群进行存储
- 消费者(Consumer)负责读取消息数据。例如,通过Hadoop的应用接口、Spark的应用接口、Storm的应用接口、ElasticSearch的应用接口,以及其他自定义服务的应用接口,主动拉取Kafka集群中的消息数据
Kafka常用Shell命令
- 查看Topic列表
./bin/kafka-topics.sh --list --zookeeper cdh-worker-1:2181/kafka
- 查看Topic详细信息
./bin/kafka-topics.sh --describe --zookeeper cdh-worker-1:2181/kafka
- 创建Topic
./bin/kafka-topics.sh --create --zookeeper cdh-worker-1:2181/kafka --replication-factor 3 --partitions 1 --topic test-topic
- 删除Topic
./bin/kafka-topics.sh --delete --zookeeper cdh-worker-1:2181/kafka --topic topic-demo
- 生产数据
./bin/kafka-console-producer.sh --broker-list kafka-1:9092 --topic test-topic
- 消费数据
./bin/kafka-console-consumer.sh --bootstrap-server kafka-1:9092 --topic test-topic --from-beginning
JavaAPI操作
- 生产者示例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
}
});
producer.close();
- 消费者示例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
consumer.close();