RabbitMQ 即一个消息队列,主要是用来实现应用程序的异步和解耦,同时也能起到消息缓冲,消息分发的作用。
RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,AMQP,即Advanced Message Queuing Protocol,
高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,
提高了系统的吞吐量。
本篇将详细介绍RabbitMQ以及如何在SpringBoot中使用。
简单概念
关于消息中间件RabbitMQ的详细介绍可以参考我的博客系列教程:RabbitMQ简易教程
这里我只对一些最核心的概念拿出来讲一下,不理解这些就根本无法理解下面的代码。
几个名词术语:
- Broker - 简单来说就是消息队列服务器的实体。
- Exchange - 消息路由器,转发消息到绑定的队列上,指定消息按什么规则,路由到哪个队列。
- Queue - 消息队列,用来存储消息,每个消息都会被投入到一个或多个队列。
- Binding - 绑定,它的作用就是把 Exchange 和 Queue 按照路由规则绑定起来。
- RoutingKey - 路由关键字,Exchange 根据这个关键字进行消息投递。
- Producter - 消息生产者,产生消息的程序。
- Consumer - 消息消费者,接收消息的程序。
- Channel - 消息通道,在客户端的每个连接里可建立多个Channel,每个channel代表一个会话。
一般消息队列都是生产者将消息发送到队列,消费者监听队列进行消费。rabbitmq中一个虚拟主机(默认 /)持有一个或者多个交换机(Exchange)。
用户只能在虚拟主机的粒度进行权限控制,交换机根据一定的策略(RoutingKey)绑定(Binding)到队列(Queue)上,
这样生产者和队列就没有直接联系,而是将消息发送的交换机,交换机再把消息转发到对应绑定的队列上。
上面说了交换机(Exchange)作为rabbitmq的一个独特的重要的概念,单独拿出来讲一下。我们用到的最常见的是4中类型:
- Direct: 先匹配, 再投送。即在绑定时设定一个routing_key, 消息的routing_key匹配时, 才会被交换器投送到绑定的队列中去.
交换机跟队列必须是精确的对应关系,这种最为简单。
- Topic: 转发消息主要是根据通配符。在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息
这种可以认为是Direct 的灵活版
- Headers: 也是根据规则匹配, 相较于 direct 和 topic 固定地使用 routingkey , headers则是一个自定义匹配规则的类型,
在队列与交换器绑定时会设定一组键值对规则,消息中也包括一组键值对( headers属性),当这些键值对有一对或全部匹配时,消息被投送到对应队列
- Fanout : 消息广播模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routingkey会被忽略
理解交换机如何投送消息的原理后,后面的就比较简单了。
Ack机制
在RabbitMQ的消息队列编程中,有个非常重要的概率叫消息确认机制,也就是Ack机制,这里我需要单独讲一下。
每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了或者异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。
因为我们采用no-ack的方式进行确认,也就是说,每次Consumer接到数据后,而不管是否处理完成,RabbitMQ
Server会立即把这个Message标记为完成,然后从queue中删除了。
如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了(注意是这种情况下)。
为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack。而应该是在处理完数据后发送ack。
在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了。
如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer。这样就保证了在Consumer异常退出的情况下数据也不会丢失。
这里并没有用到超时机制。RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。
这样即使你通过Ctr-C中断了Recieve.cs,那么Message也不会丢失了,它会被分发到下一个Consumer。
如果忘记了ack,那么后果很严重。当Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个”
内存泄漏”是致命的。
下面的例子中我会使用手动Ack模式。
环境准备
RabbitMQ需要安装erlang和RabbitMQ Server,
安装教程请参考:RabbitMQ简易教程 - 安装
请先按照教程安装好RabbitMQ后,添加一个用户spring/123456,后面配置要用到。
SpringBoot中集成
接下来正式开始讲解如何在SpringBoot中集成了。
maven依赖
第一版当然是先添加maven依赖了,有官方现成的starter依赖,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <exclusions> <exclusion> <groupId>com.vaadin.external.google</groupId> <artifactId>android-json</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
|
配置文件
在application.yml中添加rabbitmq相关配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| spring: rabbitmq: host: 127.0.0.1 port: 5672 username: spring password: 123456 publisher-confirms: true publisher-returns: true listener: simple: acknowledge-mode: manual concurrency: 1 max-concurrency: 1 retry: enabled: true
|
配置类
最核心的类就是RabbitMQ的配置类,在里面定制模版类、声明交换机、队列、绑定交换机到队列。
这里我声明了一个Direct类型的交换机,并通过路由键绑定到一个队列中来测试Direct模式,
另外还声明了Fanout类型的交换机,并绑定到2个队列来测试广播模式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
| @Configuration public class RabbitConfig { @Resource private RabbitTemplate rabbitTemplate;
@Bean public AmqpTemplate amqpTemplate() { Logger log = LoggerFactory.getLogger(RabbitTemplate.class); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setEncoding("UTF-8"); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationIdString(); log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); }); rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.debug("消息发送到exchange成功,id: {}", correlationData.getId()); } else { log.debug("消息发送到exchange失败,原因: {}", cause); } }); return rabbitTemplate; }
@Bean("directExchange") public Exchange directExchange() { return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build(); }
@Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE").build(); }
@Bean public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs(); }
@Bean("fanoutExchange") public FanoutExchange fanoutExchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build(); }
@Bean("fanoutQueueA") public Queue fanoutQueueA() { return QueueBuilder.durable("FANOUT_QUEUE_A").build(); }
@Bean("fanoutQueueB") public Queue fanoutQueueB() { return QueueBuilder.durable("FANOUT_QUEUE_B").build(); }
@Bean public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); }
@Bean public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); } }
|
监听器
接下来编写消息队列的监听器类,监听队列消息并做相应的处理,并通过Ack机制确认处理完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
|
@Component public class Receiver { private static final Logger log = LoggerFactory.getLogger(Receiver.class);
@RabbitListener(queues = {"FANOUT_QUEUE_A"}) public void on(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("FANOUT_QUEUE_A " + new String(message.getBody())); }
@RabbitListener(queues = {"FANOUT_QUEUE_B"}) public void t(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("FANOUT_QUEUE_B " + new String(message.getBody())); }
@RabbitListener(queues = {"DIRECT_QUEUE"}) public void message(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("DIRECT " + new String(message.getBody())); } }
|
消息发送者
再写一个消息发送服务,用来向交换机发送消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
@Service public class SenderService { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource private RabbitTemplate rabbitTemplate;
public void broadcast(String p) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData); }
public void direct(String p) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData); }
}
|
运行测试
代码写完后当然要写个测试用例测一下嘛。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
|
@RunWith(SpringRunner.class) @SpringBootTest(classes = Application.class) public class SenderServiceTest { @Autowired private SenderService senderService;
@Test public void testCache() { senderService.broadcast("同学们集合啦!"); senderService.direct("定点消息"); } }
|
运行结果:
1 2 3
| [cTaskExecutor-1] com.xncoding.pos.mq.Receiver : FANOUT_QUEUE_A "同学们集合啦!" [cTaskExecutor-1] com.xncoding.pos.mq.Receiver : DIRECT "定点消息" [cTaskExecutor-1] com.xncoding.pos.mq.Receiver : FANOUT_QUEUE_B "同学们集合啦!"
|
可以看到,广播消息被两个队列收到了,Direct消息只有一个收到。完美!
GitHub源码
springboot-rabbitmq