
Python 的 pika 库是一个纯 Python 实现的 RabbitMQ(AMQP 0-9-1)客户端库,用于开发与 RabbitMQ 消息队列服务进行交互的应用程序。通过 pika,开发者可以在 Python 程序中轻松地连接到 RabbitMQ 服务器,发送和接收消息,实现复杂的异步处理流程。
pika 支持同步和异步两种操作方式,提供了高级API来处理消息确认、拒绝和批量操作。
它广泛应用于 Web 开发、任务队列、事件驱动架构和微服务中,帮助构建可扩展和高可用性的分布式系统。使用 pika,开发者可以高效地实现消息的发布/订阅模式和请求/应答模式,是 RabbitMQ 与 Python 应用程序之间的桥梁。
import pikaimport jsoncredentials = pika.PlainCredentials('user', '123456') # mq用户名和密码## 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))channel=connection.channel()## 声明消息队列,消息将在这个队列传递,如不存在,则创建result = channel.queue_declare(queue = 'python-test')for i in range(10):message=json.dumps({'OrderId':"1000%s"%i})# 向队列插入数值 routing_key是队列名channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)print(message)connection.close()
import pikacredentials = pika.PlainCredentials('user', '123456')connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))channel = connection.channel()## 申明消息队列,消息在这个队列传递,如果不存在,则创建队列channel.queue_declare(queue = 'python-test', durable = False)## 定义一个回调函数来处理消息队列中的消息,这里是打印出来def callback(ch, method, properties, body):ch.basic_ack(delivery_tag = method.delivery_tag)print(body.decode())## 告诉rabbitmq,用callback来接收消息channel.basic_consume('python-test',callback)## 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理channel.start_consuming()
## 用户名密码 PlainCredentialscredentials = pika.PlainCredentials('user', '123456')
## 连接 BlockingConnectionconnection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/'))connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
## 申明队列channel.queue_declare(queue='test') # 声明队列向其发送消息## durable = True # 代表消息队列持久化存储## durable = False # 非持久化存储(默认)
## 申明队列channel.exchange_declare(exchange = 'test', durable = True, exchange_type='fanout')## durable = True # 代表消息队列持久化存储## durable = False # 非持久化存储(默认)## exchange_type='fanout'## exchange_type='direct'## exchange_type='topic'## exchange_type='headers'
## 发送信息channel.basic_publish(exchange='', routing_key='test', body='Hello World!')## 注意当前未定义 exchange ,routing_key 需和 queue 保持一致connection.close() # 关闭连接## routing_key='test'队列名## properties=pika.BasicProperties(delivery_mode = 2) # 声明消息在队列中持久化## properties=pika.BasicProperties(delivery_mode = 1) # 消息非持久化## body='Hello World!' # 保存的信息字符串
## 处理信息def callback(ch, method, properties, body):print(body)channel.basic_consume(callback, queue='test', no_ack=True)channel.start_consuming() # 开始建监听 接收消息## no_ack 确认标识## False,在调用callback函数时,未收到确认标识,消息会重回队列。## True,无论调用callback成功与否,消息都被消费掉