文章摘要
GPT 4
此内容根据文章生成,仅用于文章内容的解释与总结
投诉

1. 消息可靠性

1.1 生产者确认机制

1.2 消息持久化(默认就是持久化,了解代码)

1.3 消费者消息确认

1.3.1 none模式

1.3.2 auto模式

1.4 消费者重试机制

1.4.1 本地重试(默认的RejectAndDontRequeueRecoverer策略)

基于1.3.2的auto模式,发送失败的消息会重新放入队列queue中,导致mq反复处理失败的消息,带来过多的压力。因此通过本地重试的方式,将失败的队列进行retry处理(spring),而不是反复放入队列。

在消费者的application.yml文件中配置启动失败重试开关、以及重复的次数:

1.4.2 三个失败策略(RepublishMessageRecoverer实例)

在springAMQP中的失败策略有三个,1.4.1中是默认的处理方式。对于废弃的消息,如果不希望消息的丢失,可以采用RepublishMessageRecoverer策略。流程图:

消费者中绑定error交换机后(重写失败策略实现),由交换机根据rountingkey(例子中key=error)转发到error.queue队列。

对于MessageRecoverer接口,在1.4.1中的重试机制是默认的,因此接口对应1.4.1也是默认的(不需要声明)。但是使用RepublishMessage -Recoverer方式实现重试机制需要覆盖MessageRecoverer接口。

声明在消费者模块中的config类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Configuration
public class ErrorMessageConfig {

//声明交换机:当重试机制触发上限时,消费者自动递交失败消息的交换机errorExchange
@Bean
public DirectExchange errorMessageExchange(){
return new DirectExchange("error.direct");
}

//声明队列:绑定errorExchange的队列errorQueue
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}

//绑定errorExchange交换机和errorQueue队列
@Bean
public Binding errorMessageBinding(){
return BindingBuilder.bind(errorQueue()).to(errorMessageExchange()).with("error");
}

//声明MessageRecoverer接口,覆盖默认RejectAndDontRequeueRecoverer方式
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}
}

2. 死信交换机

2.1 成为死信的条件

成为死信的前提消费者没有重试机制,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 声明普通的 simple.queue队列,并且为其指定死信交换机:dl.direct
@Bean
public Queue simpleQueue2(){
return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
.deadLetterExchange("dl.direct") // 指定死信交换机
.build();
}
// 声明死信交换机 dl.direct
@Bean
public DirectExchange dlExchange(){
return new DirectExchange("dl.direct", true, false);
}
// 声明存储死信的队列 dl.queue
@Bean
public Queue dlQueue(){
return new Queue("dl.queue", true);
}
// 将死信队列 与 死信交换机绑定
@Bean
public Binding dlBinding(){
return BindingBuilder.bind(dlQueue()).to(dlExchange()).with("simple");
}

2.2 结合TTL和死信交换机实现延时发送消息

2.2.1 & 2.2.2 声明死信交换机和指定TTL的队列

2.2.3 消息指定TTL

2.3 DelayExchange(mq插件)

插件用来改进2.2方式,但是我觉得特别麻烦我选择用2.2。另外官方推荐版本3.9.0适配当前docker中装的mq。另外,在docker创建mq的容器时需要指定数据卷,不然把容器删了重新创建,此时注意修改账号密码,不然登不进mq客户端(最后给创建容器命令)。需要将插件拖进去(使用finalshell)

1
2
3
4
5
6
7
8
9
10
docker run \
-e RABBITMQ_DEFAULT_USER=root\
-e RABBITMQ_DEFAULT_PASS=root \
-v mq-plugins:/plugins \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management

3. 惰性队列

3.1 消息堆积问题

3.2 惰性队列