AMQP和RabbitMQ

介绍

四个概念

  • 虚拟主机(virtual host)
  • 交换机(exchange)
  • 队列(queue)
  • 绑定(binding)

一个虚拟主机持有一组交换机、队列和绑定。
队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器。消息就一直在里面,直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止。
需要记住的是,队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行工具。
交换机可以理解成具有路由表的路由程序,仅此而已。
每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机当中有一系列的绑定(binding),即路由规则(routes)。
OK,你已经创建了一个交换机。但是他并不知道要把消息送到哪个队列。你需要路由规则,即绑定(binding)。换句话说,一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。

交换机类型

  • Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。Fanout交换机转发消息是最快的。
  • Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
  • Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。

持久化

  • 将交换机设成 durable。
  • 将队列设成 durable。
  • 将消息的 Delivery Mode 设置成2 。

Example

amqp_consumer.py

# -*- coding: utf8 -*-

from amqplib import client_0_8 as amqp


conn = amqp.Connection(host='localhost:5672', 
                       userid='guest',
                       password='guest',
                       virtual_host='/',
                       insist=False)
chan = conn.channel()
chan.queue_declare(queue="po_box",
                   durable=True,
                   exclusive=False,
                   auto_delete=False)
chan.exchange_declare(exchange="sorting_room",
                      type="direct",
                      durable=True,
                      auto_delete=False)
chan.queue_bind(queue="po_box", exchange="sorting_room", routing_key="json")

def recv_callback(msg):
    print 'Received: ' + msg.body + ' from channel #' + str(msg.channel.channel_id)


chan.basic_consume(queue="po_box", 
                   no_ack=True, 
                   callback=recv_callback,
                   consumer_tag="testtag")
while True:
    chan.wait()
chan.basic_cancel("testtag")


chan.close()
conn.close()

amqp_publisher.py

# -*- coding: utf-8 -*-
import sys
from amqplib import client_0_8 as amqp

conn = amqp.Connection(host='localhost:5672',
                       userid='guest',
                       password='guest',
                       virtual_host='/',
                       insist=False)
chan = conn.channel()

msg = amqp.Message(sys.argv[1])
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg, exchange="sorting_room", routing_key="json")

chan.close()
conn.close()