本文演示如何在 Spring Boot 应用中通过 RabbitMQ 传递对象(JSON格式)。
添加依赖库 在项目中添加依赖项:
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
添加配置类 在项目中新建 config 包(package), 在包中新增名为: RabbitConfig 的配置类,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @Configuration public class RabbitConfig { @Bean public ConnectionFactory connectionFactory () { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost" , 5672 ); connectionFactory.setUsername("guest" ); connectionFactory.setPassword("guest" ); return connectionFactory; } @Bean public MessageConverter jsonMessageConverter () { return new Jackson2JsonMessageConverter(); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory () { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(3 ); factory.setMaxConcurrentConsumers(10 ); return factory; } @Bean public Queue queue1 () { return new Queue("hello.queue1" , true ); } @Bean public Queue queue2 () { return new Queue("hello.queue2" , true ); } @Bean TopicExchange topicExchange () { return new TopicExchange("topicExchange" ); } @Bean public Binding binding1 () { return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1" ); } @Bean public Binding binding2 () { return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#" ); } }
定义要传输的实体类 新增名为 Todo 的实体类:
1 2 3 4 5 6 7 8 9 10 @Data public class Todo { private Integer id; private String title; private String desc; }
添加消息生产者 新增名为: Sender 的类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Component public class Sender implements RabbitTemplate .ConfirmCallback , RabbitTemplate .ReturnCallback { private static final Logger LOG = LoggerFactory.getLogger(Sender.class ) ; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private MessageConverter messageConverter; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(this ); rabbitTemplate.setReturnCallback(this ); rabbitTemplate.setMessageConverter(messageConverter); } @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { if (ack) { LOG.debug("消息发送成功: {}" , correlationData); } else { LOG.debug("消息发送失败: {}" , cause); } } @Override public void returnedMessage (Message message, int replyCode, String replyText, String exchange, String routingKey) { LOG.debug("{} 发送失败" , message.getMessageProperties().getCorrelationId().toString()); } public void send (Todo todo) { CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString()); LOG.debug("开始发送消息 : {}" , todo); rabbitTemplate.convertSendAndReceive("topicExchange" , "key.1" , todo, correlationId); LOG.debug("结束发送消息 : {}" , todo); LOG.debug("消费者响应 : {} 消息处理完成" ); } }
添加消息监听者 新增名为: Receiver 的类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Component public class Receiver { private static final Logger LOG = LoggerFactory.getLogger(Receiver.class ) ; @Autowired private MessageConverter messageConverter; @PostConstruct public void init () { } @RabbitListener (queues = "hello.queue1" ) public Todo processMessage1 (Todo msg) { LOG.debug("{} 接收到来自hello.queue1队列的消息:{}" , Thread.currentThread().getName(), msg); return msg; } @RabbitListener (queues = "hello.queue2" ) public void processMessage2 (Todo msg) { LOG.debug("{} 接收到来自hello.queue2队列的消息:{} " , Thread.currentThread().getName(), msg); } }
构建测试类 构建测试类进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @SpringBootTest public class SpringbootRabbitmqBasicApplicationTests { @Autowired private Sender sender; @Test public void sendTest () throws Exception { for (int i = 0 ; i < 1 ; i++) { Todo todo = new Todo(); todo.setId(i); todo.setTitle("Todo: " + i); todo.setDesc("Desc for " + i); sender.send(todo); Thread.currentThread().sleep(1000 ); } } }