在 Spring Boot 中使用 RabbitMQ

本文演示如何在 Spring Boot 应用中通过 RabbitMQ 传递对象(JSON格式)。

添加依赖库

在项目中添加依赖项:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置类

在项目中新建 config 包(package), 在包中新增名为: RabbitConfig 的配置类,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Configuration
public class RabbitConfig {

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost", 5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}

@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);

return factory;
}

//声明队列
@Bean
public Queue queue1() {
return new Queue("hello.queue1", true);
}

@Bean
public Queue queue2() {
return new Queue("hello.queue2", true);
}

//声明交互器
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}

//绑定
@Bean
public Binding binding1() {
return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
}

@Bean
public Binding binding2() {
return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
}
}

定义要传输的实体类

新增名为 Todo 的实体类:

1
2
3
4
5
6
7
8
9
10
@Data
public class Todo {

private Integer id;

private String title;

private String desc;

}

添加消息生产者

新增名为: Sender 的类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@Component
public class Sender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

private static final Logger LOG = LoggerFactory.getLogger(Sender.class);


@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private MessageConverter messageConverter;

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
rabbitTemplate.setMessageConverter(messageConverter);
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
LOG.debug("消息发送成功: {}", correlationData);
} else {
LOG.debug("消息发送失败: {}", cause);
}

}

@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
LOG.debug("{} 发送失败", message.getMessageProperties().getCorrelationId().toString());

}

//发送消息,不需要实现任何接口,供外部调用。
public void send(Todo todo){

CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());

LOG.debug("开始发送消息 : {}", todo);

rabbitTemplate.convertSendAndReceive("topicExchange", "key.1", todo, correlationId);

LOG.debug("结束发送消息 : {}", todo);
LOG.debug("消费者响应 : {} 消息处理完成");
}

}

添加消息监听者

新增名为: Receiver 的类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
public class Receiver {

private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

@Autowired
private MessageConverter messageConverter;


@PostConstruct
public void init() {
}


@RabbitListener(queues = "hello.queue1")
public Todo processMessage1(Todo msg) {
LOG.debug("{} 接收到来自hello.queue1队列的消息:{}", Thread.currentThread().getName(), msg);
return msg;
}

@RabbitListener(queues = "hello.queue2")
public void processMessage2(Todo msg) {
LOG.debug("{} 接收到来自hello.queue2队列的消息:{} ", Thread.currentThread().getName(), msg);
}

}

构建测试类

构建测试类进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@SpringBootTest
public class SpringbootRabbitmqBasicApplicationTests {

@Autowired
private Sender sender;


@Test
public void sendTest() throws Exception {

for(int i = 0; i < 1; i++) {
Todo todo = new Todo();
todo.setId(i);
todo.setTitle("Todo: " + i);
todo.setDesc("Desc for " + i);
sender.send(todo);
Thread.currentThread().sleep(1000);
}
}

}

本文标题:在 Spring Boot 中使用 RabbitMQ

文章作者:Morning Star

发布时间:2019年12月27日 - 20:12

最后更新:2021年04月16日 - 15:04

原始链接:https://www.mls-tech.info/java/springboot-use-rabbitmq/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。