【RabbitMQ 服务器】

# 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定Broker: 192.168.0.xxvirtual host: vhosttestExchange: exchangetest Queue: queuetest Routing key: rkeytest

【Python 环境】

OS: Windows 10Python: 3.6.3 x64pika: 0.11.2

【查看队列状态】

# 通过浏览器查看队列状态http://192.168.0.xx:15672/api/queues/vhosttest/queuetest # 通过命令行查看队列状态curl -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  |  jq# 通过命令行查看队列长度(messages = messages_ready + messages_unacknowledged)curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  | \    jq '.messages'

【send.py】

#encoding: utf-8#author: walker#date: 2018-01-31#summary: 发送方/生产者import os, sys, timeimport pikadef Main():	credentials = pika.PlainCredentials("test", "test")	parameters = pika.ConnectionParameters(host="192.168.0.xx", 											virtual_host='vhosttest', 											credentials=credentials)	connection = pika.BlockingConnection(parameters)    # 连接 RabbitMQ	channel = connection.channel()          # 创建频道		queue = channel.queue_declare(queue='queuetest')     # 声明或创建队列		while True:  # 循环向队列中发送信息		message = time.strftime('%H:%M:%S', time.localtime())		channel.basic_publish(exchange='exchangetest',  								routing_key='rkeytest',								body=message)				print('send message: %s' %  message)     		while True:   			# 检查队列,以重新得到消息计数			queue = channel.queue_declare(queue='queuetest', passive=True)  			'''			 queue.method.message_count 获取的为 ready 的消息数			 截至 2018-03-06(pika 0.11.2)			 walker 没找到利用 pika 获取 unack 或者 total 消息数的方法  			''' 			messageCount = queue.method.message_count			print('messageCount: %d' % messageCount)			if messageCount < 100:				break			connection.sleep(1)		# 关闭连接	connection.close()if __name__ == '__main__':	Main()

【recv.py - 版本1】

一个消费者

#encoding: utf-8#author: walker#date: 2018-01-31#summary: 接收方/消费者import os, sys, timeimport pika# 接收处理消息的回调函数def ConsumerCallback (channel, method, properties, body):	print("Received %s" % body)def Main():	credentials = pika.PlainCredentials("test", "test")	parameters = pika.ConnectionParameters(host="192.168.0.xx", 											virtual_host='vhosttest', 											credentials=credentials)	connection = pika.BlockingConnection(parameters)    # 连接 RabbitMQ		channel = connection.channel()          # 创建频道		queue = channel.queue_declare(queue='queuetest')     # 声明或创建队列		# no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面	# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式	channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True)	print('Wait Message ...')		channel.start_consuming()if __name__ == '__main__':	Main()

【recv.py - 版本2】

利用多线程实现多个消费者同时消费

#encoding: utf-8#author: walker#date: 2018-03-9#summary: 接收方/消费者import os, sys, timeimport pikaimport threadingfrom queue import QueueGlobalQueue = Queue(10)class Consumer(threading.Thread):	def run(self):		while True:			task = GlobalQueue.get()			print('thread-%d,\ttask: %s' % (threading.get_ident(), task))# 接收处理消息的回调函数def ConsumerCallback (channel, method, properties, body):	# 将消息推入队列	GlobalQueue.put(body)def Main():	credentials = pika.PlainCredentials("test", "test")	parameters = pika.ConnectionParameters(host="192.168.0.86", 											virtual_host='vhosttest', 											credentials=credentials)	connection = pika.BlockingConnection(parameters)    # 连接 RabbitMQ		channel = connection.channel()          # 创建频道	channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True) # 公平消费		# no_ack=True 开启自动确认,不然消费后的消息会一直留在队列里面	# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式	channel.basic_consume(ConsumerCallback, queue='queuetest', no_ack=True)	print('Wait Message ...')	for i in range(3):		c = Consumer()		c.start()	channel.start_consuming()   # 开始接收任务	if __name__ == '__main__':	Main()

【后记】

  • 如果希望不通过 exchange 路由转发,直接给队列发送消息,可以将 exchange 设为空字符串,routing_key 设为队列名。

channel.basic_publish(exchange='',  						routing_key='queuetest',						body=task)
  • 有时候  比  更方便。

【0.x 到 1.x 的迁移】

  • pika.ConnectionParameters

# 0.x 版本pika.ConnectionParameters(host=Host, 							virtual_host=VirtualHost, 							credentials=pika.PlainCredentials(User, Pwd),							heartbeat_interval=0)# 1.x 版本					pika.ConnectionParameters(host=MQHost, 								virtual_host=MQVirtualHost, 								credentials=credentials,								heartbeat=0)
  • channel.basic_qos

# 0.x 版本channel.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=True)# 1.x 版本channel.basic_qos(prefetch_size=0, prefetch_count=1, global_qos=True)
  • channel.basic_consume,终于换掉了坑爹的 no_ack 命名

# 0.x 版本# no_ack = no_manual_ack = auto_ack;不手动应答,开启自动应答模式channel.basic_consume(consumer_callback=ConsumerCallback, queue=MQQueueNode2Center, no_ack=False)# 1.x 版本channel.basic_consume(queue=MQQueueNode2Center, on_message_callback=ConsumerCallback, auto_ack=False)

【相关阅读】

  • pika 并非线程安全:

*** ***