版本信息:
JDK:8
SpringBoot 2.1.3.RELEASE
RabbitMQ消费端配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
# acknowledge-mode: manual # 手动确定(默认自动确认)
concurrency: 1 # 消费端的监听个数(即@RabbitListener开启几个线程去处理数据。)
max-concurrency: 10 # 消费端的监听最大个数
prefetch: 10
connection-timeout: 15000 # 超时时间
在消费端,配置prefetch
和concurrency
参数便可以实现消费端 MQ 并发处理消息,那么这两个参数到底有什么含义?
1、prefetch
每个 customer 会在 MQ 预取一些消息放入内存的 LinkedBlockingQueue 中进行消费,这个值越高,消息传递的越快,但非顺序处理消息的风险更高。如果 ack 模式为 none,则忽略。如有必要,将增加此值以匹配 txSize 或 messagePerAck。从2.0开始默认为250;设置为1将还原为以前的行为。
prefetch
默认值以前是1,这可能会导致高效使用者的利用率不足。从spring-amqp 2.0版开始,默认的prefetch
值是250,这将使消费者在大多数常见场景中保持忙碌,从而提高吞吐量。
不过在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存;以及对于一些严格要求顺序的消息,prefetch
的值应当设置为1。
对于低容量消息和多个消费者的情况(也包括单 listener 容器的 concurrency 配置)希望在多个使用者之间实现更均匀的消息分布,建议在手动ack下并设置prefetch=1
。
模拟:
生产者每次生产10条消息:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirms: true
publisher-returns: true
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
//直接向队列中发送数据
@GetMapping("send")
public String send() {
for (int i = 0; i < 10; i++) {
String content = "Date:" + System.currentTimeMillis();
content = content + ":::" + i;
rabbitTemplate.convertAndSend("kinson", content);
}
return "success";
}
}
2、concurrency
上面配置中,concurrency =1
,即每个Listener
容器将开启一个线程去处理消息。
在2.0版本后,可以在注解中配置该参数:
@Component
@Slf4j
public class CustomerRev {
//会覆盖配置文件中的参数。
@RabbitListener(queues = {"kinson"},concurrency = "2")
public void receiver(Message msg, Channel channel) throws InterruptedException {
// Thread.sleep(10000);
byte[] messageBytes = msg.getBody();
if (messageBytes != null && messageBytes.length > 0) {
//打印数据
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【消3】:{}", message);
}
}
}
启动服务:
即该 Listener 容器产生了两个线程去消费 queue。如果在 Listener 配置了exclusive
参数,即确定此容器中的单个 Customer 是否具有对队列的独占访问权限。如果为 true,则容器的并发性必须为1。
3、 prefetch和concurrency
若一个消费者配置 prefetch=10,concurrency=2,即会开启2个线程去消费消息,每个线程都会抓取10个消息到内存中(注意不是两个线程去共享内存中抓取的消息)。
下图所示,当10个消息进入 MQ 后,两个线程都抓取了5个线程放入了自己的 LinkedBlockingQueue 进行消费。
4、总结
前面说过,设置并发的时候,要考虑具体的业务场景,对那种对消息的顺序有苛刻要求的场景不适合并发消费,而对于其他场景,比如用户注册后给用户发个提示短信,是不太在意哪个消息先被消费,哪个消息后被消费,因为每个消息是相对独立的,后注册的用户先收到短信也并没有太大影响。
设置并发消费除了能提高消费的速度,还有另外一个好处:当某个消费者长期阻塞,此时在当前消费者内部的BlockingQueue的消息也会被一直阻塞,但是新来的消息仍然可以投递给其他消费者消费,这种情况顶多会导致prefetch个数目的消息消费有问题,而不至于单消费者情况下整个 RabbitMQ 的队列会因为一个消息有问题而全部堵死。所有在合适的业务场景下,需要合理设置 concurrency 和 prefetch 值。
原文地址:https://www.jianshu.com/p/5f97c9ca3cee
如果觉得还有帮助的话,你的关注和转发是对我最大的支持,O(∩_∩)O