RabbitMQ简易教程 - 主题

前面一篇通过使用direct类型的交换机代替fanout广播类型交换机, 实现了一个基于日志级别路由对应的消息的功能。

但是还是有它的局限性——它并不能根据多个条件来实现路由,只能通过完全匹配routing key, 灵活性不够。

比如我想实现仅仅对于那些error级别日志并且由kern生成的日志才记录到文件中。

主题交换机

主题交换机的binding key和发送到该交换机的消息所带的routing key并不是一个简单的单词, 而是以点.隔开的单词序列。比如stock.usd.nysenyse.vmw等。

另外,有两个特殊的情况:

  • *(星号)可代表任一个单词
  • #(井字符)可代表0个或多个单词

比如有如下的主题交换机:

  • quick.orange.rabbit会同时发送给2个队列
  • lazy.orange.elephant会同时发送给2个队列
  • quick.orange.fox仅仅发给第1个队列
  • lazy.brown.fox仅仅发送给第2个队列
  • quick.brown.fox被丢弃
  • quick.orange.male.rabbit被丢弃
  • lazy.orange.male.rabbit仅仅发送给第2个队列

最终代码

我们假设所有的消息的routing key形式为"<facility>.<severity>

日志生产者:

emit_log_topic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.217.161', port=5673))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
type='topic')

routing_key = 'disk.error'
message = '[disk.info] Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))

routing_key = 'disk.warning'
message = '[disk.warning] Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))


routing_key = 'test.error'
message = '[test.error] Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))


connection.close()

日志消费者:

receive_logs_topic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.217.161', port=5673))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = ['disk.error', 'disk.warning']
if not binding_keys:
sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
sys.exit(1)

for binding_key in binding_keys:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')


def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))


channel.basic_consume(callback,
queue=queue_name,
no_ack=True)

channel.start_consuming()

运行效果

emit_log_topic.py

1
2
3
[x] Sent 'disk.error':'[disk.info] Hello World!'
[x] Sent 'disk.warning':'[disk.warning] Hello World!'
[x] Sent 'test.error':'[test.error] Hello World!'

receive_logs_topic.py

1
2
3
[*] Waiting for logs. To exit press CTRL+C
[x] 'disk.error':b'[disk.info] Hello World!'
[x] 'disk.warning':b'[disk.warning] Hello World!'

可以看到,test.error的日志并没有被消费者拿到。