hand
_1_5_115
4
返回栏目
1k
9k
1k
1k
5k
1k
1k
1k
1k
3k
2k
1k
0.8k
2k
3k
1k
1k
0.7k
0.9k
1k
0.6k
0.4k
0.4k
0.3k
3k
2k
9k
0.4k
0.4k
0.8k
0.5k
3k
5k
1k
2k
2k
3k
5k
1k
1k
0.4k
0.5k
0.4k
0.6k
0.7k
1k
0.4k
0.3k
4k
0.5k
0k
0.3k
0k
0.2k
0.2k
0.3k
0.9k
0.9k
0.1k
0.9k
0.9k
1k
0.5k
6k
0.3k
0.4k
0.7k
0.6k
8k
3k
1k
1k
1k
1k
0k
2k
1k
1k
0.2k
5k
4k
5k
0.4k
0.8k
1k
1k
1k
0.1k
2k
1k
2k
6k
0k
2k
7k
1k
5k
2k
3k
1k
0k
1k
0.9k
0.4k
0.2k
1k
3k
4k
1k
1k
1k
2k
3k
0.7k
0.3k
0.5k
0.6k
1k
0.9k
3k
0.3k
4k
返回python栏目
作者:
贺及楼
成为作者
更新日期:2024-10-28 10:57:27
Python 的 pika
库是一个纯 Python 实现的 RabbitMQ(AMQP 0-9-1)客户端库,用于开发与 RabbitMQ 消息队列服务进行交互的应用程序。通过 pika
,开发者可以在 Python 程序中轻松地连接到 RabbitMQ 服务器,发送和接收消息,实现复杂的异步处理流程。
pika
支持同步和异步两种操作方式,提供了高级API来处理消息确认、拒绝和批量操作。
它广泛应用于 Web 开发、任务队列、事件驱动架构和微服务中,帮助构建可扩展和高可用性的分布式系统。使用 pika
,开发者可以高效地实现消息的发布/订阅模式和请求/应答模式,是 RabbitMQ 与 Python 应用程序之间的桥梁。
import pika
import json
credentials = pika.PlainCredentials('user', '123456') # mq用户名和密码
## 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
## 声明消息队列,消息将在这个队列传递,如不存在,则创建
result = channel.queue_declare(queue = 'python-test')
for i in range(10):
message=json.dumps({'OrderId':"1000%s"%i})
# 向队列插入数值 routing_key是队列名
channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
print(message)
connection.close()
import pika
credentials = pika.PlainCredentials('user', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))
channel = connection.channel()
## 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
channel.queue_declare(queue = 'python-test', durable = False)
## 定义一个回调函数来处理消息队列中的消息,这里是打印出来
def callback(ch, method, properties, body):
ch.basic_ack(delivery_tag = method.delivery_tag)
print(body.decode())
## 告诉rabbitmq,用callback来接收消息
channel.basic_consume('python-test',callback)
## 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
channel.start_consuming()
## 用户名密码 PlainCredentials
credentials = pika.PlainCredentials('user', '123456')
## 连接 BlockingConnection
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/'))
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))
channel=connection.channel()
## 申明队列
channel.queue_declare(queue='test') # 声明队列向其发送消息
## durable = True # 代表消息队列持久化存储
## durable = False # 非持久化存储(默认)
## 申明队列
channel.exchange_declare(exchange = 'test', durable = True, exchange_type='fanout')
## durable = True # 代表消息队列持久化存储
## durable = False # 非持久化存储(默认)
## exchange_type='fanout'
## exchange_type='direct'
## exchange_type='topic'
## exchange_type='headers'
## 发送信息
channel.basic_publish(exchange='', routing_key='test', body='Hello World!')
## 注意当前未定义 exchange ,routing_key 需和 queue 保持一致
connection.close() # 关闭连接
## routing_key='test'队列名
## properties=pika.BasicProperties(delivery_mode = 2) # 声明消息在队列中持久化
## properties=pika.BasicProperties(delivery_mode = 1) # 消息非持久化
## body='Hello World!' # 保存的信息字符串
## 处理信息
def callback(ch, method, properties, body):
print(body)
channel.basic_consume(callback, queue='test', no_ack=True)
channel.start_consuming() # 开始建监听 接收消息
## no_ack 确认标识
## False,在调用callback函数时,未收到确认标识,消息会重回队列。
## True,无论调用callback成功与否,消息都被消费掉
python
整章节共122节
快分享给你的小伙伴吧 ~