前面一篇实现了一个非常基础的日志系统,交换机将所有接收到的消息广播到它所知道的多个接受者。
这一节我们更进一步,实现订阅部分消息的功能。比如我只讲那些ERROR级别的消息写入日志文件,
同时将所有日志打印到控制台上面。
绑定
在前面的例子我们已经创建了一个绑定:
1 2
| channel.queue_bind(exchange=exchange_name, queue=queue_name)
|
简单讲就是这个队列对这个交换机上面的消息很感兴趣。
绑定需要指定一个关键字参数叫routing_key
,为了不跟basic_publish
搞混,
我现在将其称为binding key
,下面我们创建一个带key的绑定:
1 2 3
| channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key='black')
|
绑定key的含义根据不同类型的交换机而不同,对于我前面使用的fanout类型的交换机,会忽略这个参数。
Direct exchange
我现在需要根据日志级别来决定是否将消息写入文件,我只讲那些严重错误的日志写入文件,以节省磁盘空间。
那么我就不能再使用前面的fanout类型的交换机了,因为它只管广播消息到所有队列,其他不管。
我们先使用direct
交换机,它的规则很简单,就是binding key
完全一致就发送。
比如有上图这样的绑定关系,x交换机跟两个队列绑定,第一个队列使用orange
的绑定关键字,
第二个队列使用了2个绑定关键字black
和green
。
这种情况下,如果某个消息发布到该交换机时候带的routing key
值为orange
的时候,这个消息会被路由到第一个队列,
如果某个消息发布到该交换机时候带的routing key
值为black
或green
的时候,这个消息会被路由到第二个队列。
其他情况的消息都会被丢弃掉。
同时还能支持交换机使用同一个binding key
来和多个队列绑定,
这时候如果某个消息的routing key
一致,那么这个消息会同时发送给这几个队列。
最终代码
交换机和队列绑定图:
日志消息生产者代码:
emit_log_direct.py1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import pika import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.217.161', port=5673)) channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', type='direct')
severity = 'info' message = '[INFO] Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message))
severity = 'error' message = '[ERROR] Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message))
severity = 'warning' message = '[WARNING] Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message))
connection.close()
|
我发送了三条消息,级别分别为info
, error
和 warning
。
日志消费者:
receive_logs_direct.py1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import pika import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.217.161', port=5673)) channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', type='direct')
result = channel.queue_declare(exclusive=True) queue_name = result.method.queue
severities = ['warning', 'error']
for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
|
我创建了一个临时队列,并将其和一个交换机绑定,并且指定binding key
为error
和warning
。
运行效果:
日志生产者:
1 2 3
| [x] Sent 'info':'[INFO] Hello World!' [x] Sent 'error':'[ERROR] Hello World!' [x] Sent 'warning':'[WARNING] Hello World!'
|
日志消费者:
1 2 3
| [*] Waiting for logs. To exit press CTRL+C [x] 'error':b'[ERROR] Hello World!' [x] 'warning':b'[WARNING] Hello World!'
|
很明显,只有error
和warning
级别的日志被消费了。