• 主页

  • 投资

  • IT

    🔥
  • 设计

  • 销售

关闭

返回栏目

关闭

返回python栏目

113 - 第三方库 - pika - RabbitMQ

作者:

贺及楼

成为作者

更新日期:2024-10-28 10:57:27

pika

pika库的简介

Python 的 pika 库是一个纯 Python 实现的 RabbitMQ(AMQP 0-9-1)客户端库,用于开发与 RabbitMQ 消息队列服务进行交互的应用程序。通过 pika,开发者可以在 Python 程序中轻松地连接到 RabbitMQ 服务器,发送和接收消息,实现复杂的异步处理流程。

pika 支持同步和异步两种操作方式,提供了高级API来处理消息确认、拒绝和批量操作。

它广泛应用于 Web 开发、任务队列、事件驱动架构和微服务中,帮助构建可扩展和高可用性的分布式系统。使用 pika,开发者可以高效地实现消息的发布/订阅模式和请求/应答模式,是 RabbitMQ 与 Python 应用程序之间的桥梁。

生产者

  1. import pika
  2. import json
  3. credentials = pika.PlainCredentials('user', '123456') # mq用户名和密码
  4. ## 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
  5. connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))
  6. channel=connection.channel()
  7. ## 声明消息队列,消息将在这个队列传递,如不存在,则创建
  8. result = channel.queue_declare(queue = 'python-test')
  9. for i in range(10):
  10. message=json.dumps({'OrderId':"1000%s"%i})
  11. # 向队列插入数值 routing_key是队列名
  12. channel.basic_publish(exchange = '',routing_key = 'python-test',body = message)
  13. print(message)
  14. connection.close()

消费者

  1. import pika
  2. credentials = pika.PlainCredentials('user', '123456')
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))
  4. channel = connection.channel()
  5. ## 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
  6. channel.queue_declare(queue = 'python-test', durable = False)
  7. ## 定义一个回调函数来处理消息队列中的消息,这里是打印出来
  8. def callback(ch, method, properties, body):
  9. ch.basic_ack(delivery_tag = method.delivery_tag)
  10. print(body.decode())
  11. ## 告诉rabbitmq,用callback来接收消息
  12. channel.basic_consume('python-test',callback)
  13. ## 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
  14. channel.start_consuming()
  1. ## 用户名密码 PlainCredentials
  2. credentials = pika.PlainCredentials('user', '123456')
  1. ## 连接 BlockingConnection
  2. connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/'))
  3. connection = pika.BlockingConnection(pika.ConnectionParameters(host = '127.0.0.1',port = 5672,virtual_host = '/',credentials = credentials))
  1. channel=connection.channel()
  1. ## 申明队列
  2. channel.queue_declare(queue='test') # 声明队列向其发送消息
  3. ## durable = True # 代表消息队列持久化存储
  4. ## durable = False # 非持久化存储(默认)
  1. ## 申明队列
  2. channel.exchange_declare(exchange = 'test', durable = True, exchange_type='fanout')
  3. ## durable = True # 代表消息队列持久化存储
  4. ## durable = False # 非持久化存储(默认)
  5. ## exchange_type='fanout'
  6. ## exchange_type='direct'
  7. ## exchange_type='topic'
  8. ## exchange_type='headers'
  1. ## 发送信息
  2. channel.basic_publish(exchange='', routing_key='test', body='Hello World!')
  3. ## 注意当前未定义 exchange ,routing_key 需和 queue 保持一致
  4. connection.close() # 关闭连接
  5. ## routing_key='test'队列名
  6. ## properties=pika.BasicProperties(delivery_mode = 2) # 声明消息在队列中持久化
  7. ## properties=pika.BasicProperties(delivery_mode = 1) # 消息非持久化
  8. ## body='Hello World!' # 保存的信息字符串
  1. ## 处理信息
  2. def callback(ch, method, properties, body):
  3. print(body)
  4. channel.basic_consume(callback, queue='test', no_ack=True)
  5. channel.start_consuming() # 开始建监听 接收消息
  6. ## no_ack 确认标识
  7. ## False,在调用callback函数时,未收到确认标识,消息会重回队列。
  8. ## True,无论调用callback成功与否,消息都被消费掉