Python 的 pika
库是一个纯 Python 实现的 RabbitMQ(AMQP 0-9-1)客户端库,用于开发与 RabbitMQ 消息队列服务进行交互的应用程序。通过 pika
,开发者可以在 Python 程序中轻松地连接到 RabbitMQ 服务器,发送和接收消息,实现复杂的异步处理流程。
pika
支持同步和异步两种操作方式,提供了高级API来处理消息确认、拒绝和批量操作。
它广泛应用于 Web 开发、任务队列、事件驱动架构和微服务中,帮助构建可扩展和高可用性的分布式系统。使用 pika
,开发者可以高效地实现消息的发布/订阅模式和请求/应答模式,是 RabbitMQ 与 Python 应用程序之间的桥梁。
import pika
import json
credentials = pika.PlainCredentials('user', '123456') # mq用户名和密码
## 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
## 声明消息队列,消息将在这个队列传递,如不存在,则创建
result = channel.queue_declare(queue = 'python-test')
for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
# 向队列插入数值 routing_key是队列名
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
print(message)
connection.close()
import pika
credentials = pika.PlainCredentials('user', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
## 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
channel.queue_declare(queue = 'python-test', durable = False)
## 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
## 告诉rabbitmq,用callback来接收消息
channel.basic_consume('python-test',callback)
## 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()
## 用户名密码 PlainCredentials
credentials = pika.PlainCredentials('user', '123456')
## 连接 BlockingConnection
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/'))
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
## 申明队列
channel.queue_declare(queue='test') # 声明队列向其发送消息
## durable = True # 代表消息队列持久化存储
## durable = False # 非持久化存储(默认)
## 申明队列
channel.exchange_declare(exchange = 'test', durable = True, exchange_type='fanout')
## durable = True # 代表消息队列持久化存储
## durable = False # 非持久化存储(默认)
## exchange_type='fanout'
## exchange_type='direct'
## exchange_type='topic'
## exchange_type='headers'
## 发送信息
channel.basic_publish(exchange='', routing_key='test', body='Hello World!')
## 注意当前未定义 exchange ,routing_key 需和 queue 保持一致
connection.close() # 关闭连接
## routing_key='test'队列名
## properties=pika.BasicProperties(delivery_mode = 2) # 声明消息在队列中持久化
## properties=pika.BasicProperties(delivery_mode = 1) # 消息非持久化
## body='Hello World!' # 保存的信息字符串
## 处理信息
def callback(ch, method, properties, body):
print(body)
channel.basic_consume(callback, queue='test', no_ack=True)
channel.start_consuming() # 开始建监听 接收消息
## no_ack 确认标识
## False,在调用callback函数时,未收到确认标识,消息会重回队列。
## True,无论调用callback成功与否,消息都被消费掉