kafka基础操作

夏雪冬蝉 / 2023-07-23 / 原文

什么是kafka

  • kafka本身并不是消息队列,而是一份分布式流平台(高并发,低延迟。高吞吐量)。
  • kafka是基于zookeeper的分布式消息系统。
  • kafka具有高吞吐率、高性能、实时及高可靠等特点。

kafka基本概念

  • Topic:一个虚拟的概念,由一个到多个Partitions组成
  • Partition:实际消息存储单位
  • Producer:消息生产者
  • Consumer:消息消费者

kafka环境配置

导入压缩包

虚拟机centos7

新建一个目录/opt/software

然后安装拷贝工具

yum install -y lrzszc

此时输入rz就能从主机拷贝东西进去

拷贝这三个文件夹

 新建一个安装目录/opt/install,然后分别将这三个压缩包解压到该目录,解压命令如下

tar -zxvf jdk-8u181-linux-x64.tar.gz -C ../install/
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C ../install/
tar -zxvf kafka_2.11-2.4.0.tgz -C ../install/

 配置jdk的环境变量

输入

vim /etc/profile

在最下面添加

 一定要刷新

source /etc/profile

Zookeeper的安装启动

里面有配置文件样例

 拷贝一份样例,并修改。基本上是吧dataDid存到磁盘目录比较大的地方,而不是tmp,这里暂时没有修改。

cp zoo_sample.cfg zoo.cfg

可以启动Zookeeper

./zkServer.sh start

 可以查看是否启动成功

./zkCli.sh或
ps -ef|grep zookeeper

 

kafka配置

主要修改配置文件server.properties

vim server.properties

修改以下内容

# 填写本机端口
listeners=PLAINTEXT://192.168.75.136:9092
advertised.listeners=PLAINTEXT://192.168.75.136:9092
log.dirs=/opt/install/kafka_2.11-2.4.0/datas

一些启停命令

1.启动kafka
bin/kafka-server-start.sh config/server.properties &
2.停止kafka
bin/kafka-server-stop.sh
3.创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
4.查看已创建的Topic信息
bin/kafka-topics.sh --list --zookeeper localhost:2181
5.发送消息
bin/kafka-console-producer.sh --broker-list 192.168.75.136:9092 --topic test
6.接受消息
bin/kafka-console-consumer.sh --bootstrap-server 192。168.75.136:9092 --topic test --from-beginning 

测试数据:发送JSON
{"orderId":"001", "price":"60"}

测试