python调用RabbitMQ

木先生哦~ / 2023-08-04 / 原文

本文不涉及较难的操作,仅仅提供 常用的生产消息和消费消息的方式。
-- 好像也没啥花里胡哨的操作

1、准备

想要python调用rabbitMQ需要安装pika,所有需要提前安装好pika

# 全局
pip install pika;
# 如果用的anaconda的上面那个装不上可以试试这个?
conda install pika;

2、代码

2.1、生产者

生产者采用direct模式

import pika

USERNAME = 'admin' # 用户名
PASSWROD = 'admin' # 密码
HOST = '127.0.0.1' # rabbitMQ的IP
PORT = 5672 # 端口
WRITE_QUEUE='demo_write.queue' # 队列
WRITE_EXCHANGE='demo.exchange' # 交换机
ROUTING_KEY='demo' # routing-key

if __name__ == '__main__':
	# 创建一个凭证
	credentials=pika.PlainCredentials(username=USERNAME,password=PASSWROD)
	# 创建一个连接
	connection = pika.ConnectionParameters(host=HOST,port=PORT,credentials=credentials)
	# 建立连接并获取一个通道,
	# 此处采用阻塞连接(这个方式最简单了,但是对于生产者没啥区别)
	channel = pika.BlockingConnection(connection).channel()
	# 创建交换机和队列,如果没有就会自动创建
	# 如果已经创建的与当前定义的不一样会**报错**
	# 此处durable表示是是否持久化
	channel.exchange_declare(exchange=WRITE_EXCHANGE,durable=True,exchange_type=ExchangeType.direct)
	channel.queue_declare(queue=WRITE_QUEUE,durable=True)
	# 绑定
	# 如果队列或交换机不存在**报错**
channel.queue_bind(queue=WRITE_QUEUE,exchange=WRITE_EXCHANGE,routing_key=ROUTING_KEY)
	message='{"data":"这里是我的消息"}'
	# 进行生产
	writeConnection.basic_publish(exchange=WRITE_EXCHANGE,routing_key=ROUTING_KEY,body=message)

2.2、消费者

消费者采用basic模式

import pika

USERNAME = 'admin' # 用户名
PASSWROD = 'admin' # 密码
HOST = '127.0.0.1' # rabbitMQ的IP
PORT = 5672 # 端口
READ_QUEUE='demo_read.queue' # 读取任务的队列名称(各个算法需要匹配对应的)

# 此时需要准备一个回调函数,参数不过多解释
def call_back(ch, method, properties, body):
    # 获取一条消息(如果直接获取会是乱码)
    message = str(body.decode('utf-8'))
    # 处理逻辑
    # 阿巴阿巴

    # ack确认(确定接收成功后调用,不然消息会一直存在)
    ch.basic_ack(delivery_tag=method.delivery_tag)

if __name__ == '__main__':
  # 这些已经在上面解释过了
  credentials=pika.PlainCredentials(username=USERNAME,password=PASSWROD)
  connection = pika.ConnectionParameters(host=HOST,port=PORT,credentials=credentials)
  # 此处采用阻塞连接
  # 这个方式最简单了,当程序启动后会进行阻塞,当有消息来的时候就会进行消费,消费完成后在尝试获取下一个
  channel = pika.BlockingConnection(connection).channel()
  channel.queue_declare(queue=READ_QUEUE,durable=True)
  # 消费设置
  # 预读取数量
  readConnection.basic_qos(prefetch_count=1)
  # on_message_callback:回调函数名称
  # auto_ack:是否自动ack
  channel.basic_consume(queue=READ_QUEUE,on_message_callback=call_back,auto_ack=False)
  # 开始消费
  channel.start_consuming()

Other、实际使用可能出现的问题

A、json格式注意!

python与某个J开头的编程语言(java)通过mq交互时,json格式有规定

java这边采用Jackson2JsonMessageConverter模式接收消息

对于py
1、json的格式必须'{"a":"a"}'而不是"{'a':'a'}"
2、如果json格式为\'{\"a\":\"a\"}\'接收json需要调用两次json_loads()才会变为dict类型,仅调用一次只是str类型

B、104异常?

度娘:
消费某条消息太久写回通道以为程序废了就自动关闭了,然后尝试写回就挂了。
解决方案:创建连接的时候加上heartbeat=0让他等多久都不要关闭

  connection = pika.ConnectionParameters(host=HOST,port=PORT,credentials=credentials,heartbeat=0)

感觉不太行,我采用的是消费超时机制和者重启机制
因为重启机制简单所以就没用消费超时机制
1、消费超时机制:消费时通过指定的算法逻辑让消费时间过长时(<断开连接时间)自动停止任务并丢到人工队列去
2、重启机制:就是字面意思重启,有时候程序可能是调度,资源不足等就会导致调用突发性的很慢很慢,此时重启程序可以改善问题。

C、挂起太久不干活了?

当我使用java开启线程去启动mq程序时,执行一段时间后会突然不干活了。原因定位到就是B(上面那个)问题。这个错误报错竟然不会停止任务(不知道是不是我在python端也捕获了异常的原因)
解决方案:java端进行循环启动,程序停止后自动重启。python端采用定时关闭功能。

注意:重启方法必须在python使用os.exit(0),正常通过手动停止mq消息监听关不掉,java端直接结束线程有几率关不掉。

D、开启多个任务实际干活速度没有提上来?

抛开实际的运行瓶颈,资源数量,有可能是没有设置预读取数量。默认预读取数量时全部,这样会导致一个mq任务启动后直接读取全部消息而其他任务无消息可读。

# 就是这句话
readConnection.basic_qos(prefetch_count=1)
# 加在这段代码上面就可以了
# channel.basic_consume()
# channel.start_consuming()