SpringBoot应用系列文章
序
本文主要讲如何在Spring Boot里头使用rabbitmq进行消息收发。
准备rabbitmq集群
具体查看这篇文章。
新建项目
配置项
#http://segmentfault.com/a/1190000004309900spring.rabbitmq.host=192.168.99.100spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/logging.level.org.springframework.amqp=ERRORlogging.level.com.demo=INFO#spring.rabbitmq.dynamic 是否创建AmqpAdmin bean. 默认为: true#spring.rabbitmq.listener.acknowledge-mode 指定Acknowledge的模式.#spring.rabbitmq.listener.auto-startup 是否在启动时就启动mq,默认: true#spring.rabbitmq.listener.concurrency 指定最小的消费者数量.#spring.rabbitmq.listener.max-concurrency 指定最大的消费者数量.#spring.rabbitmq.listener.prefetch 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.#spring.rabbitmq.listener.transaction-size 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.#spring.rabbitmq.requested-heartbeat 指定心跳超时,0为不指定.#spring.rabbitmq.ssl.enabled 是否开始SSL,默认: false#spring.rabbitmq.ssl.key-store 指定持有SSL certificate的key store的路径#spring.rabbitmq.ssl.key-store-password 指定访问key store的密码.#spring.rabbitmq.ssl.trust-store 指定持有SSL certificates的Trust store.#spring.rabbitmq.ssl.trust-store-password 指定访问trust store的密码.
生产者配置
@Configurationpublic class ProducerConfig { @Bean RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } @Bean Queue queueFoo(RabbitAdmin rabbitAdmin) { Queue queue = new Queue("queue.foo", true); rabbitAdmin.declareQueue(queue); return queue; } @Bean Queue queueBar(RabbitAdmin rabbitAdmin) { Queue queue = new Queue("queue.bar", true); rabbitAdmin.declareQueue(queue); return queue; } @Bean TopicExchange exchange(RabbitAdmin rabbitAdmin) { TopicExchange topicExchange = new TopicExchange("exchange"); rabbitAdmin.declareExchange(topicExchange); return topicExchange; } @Bean Binding bindingExchangeFoo(Queue queueFoo, TopicExchange exchange,RabbitAdmin rabbitAdmin) { Binding binding = BindingBuilder.bind(queueFoo).to(exchange).with("queue.foo"); rabbitAdmin.declareBinding(binding); return binding; } @Bean Binding bindingExchangeBar(Queue queueBar, TopicExchange exchange,RabbitAdmin rabbitAdmin) { Binding binding = BindingBuilder.bind(queueBar).to(exchange).with("queue.bar"); rabbitAdmin.declareBinding(binding); return binding; } /** * 生产者用 * @return */ @Bean public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) { RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate(); rabbitMessagingTemplate.setMessageConverter(jackson2Converter()); rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate); return rabbitMessagingTemplate; } @Bean public MappingJackson2MessageConverter jackson2Converter() { MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); return converter; }}
消费者配置
@Configuration@EnableRabbitpublic class ConsumerConfig implements RabbitListenerConfigurer { @Autowired ReceiverService receiverService; @Bean public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() { DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory(); factory.setMessageConverter(new MappingJackson2MessageConverter()); return factory; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); // factory.setPrefetchCount(5); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); return factory; } @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory()); }}
消息收发
发送服务
@Componentpublic class SenderService { @Autowired private RabbitMessagingTemplate rabbitMessagingTemplate; public void sendFoo2Rabbitmq(final Foo foo) { this.rabbitMessagingTemplate.convertAndSend("exchange", "queue.foo", foo); } public void sendBar2Rabbitmq(final Bar bar){ this.rabbitMessagingTemplate.convertAndSend("exchange", "queue.bar", bar); }}
调用
@SpringBootApplication@ComponentScan(basePackages = "com.demo")public class RabbitmqdemoApplication implements CommandLineRunner { public static void main(String[] args) { SpringApplication.run(RabbitmqdemoApplication.class, args); } @Autowired SenderService senderService; @Override public void run(String... strings) throws Exception { Random random = new Random(); while (true){ senderService.sendBar2Rabbitmq(new Bar(random.nextInt())); senderService.sendFoo2Rabbitmq(new Foo(UUID.randomUUID().toString())); } }}
接收
@Componentpublic class ReceiverService { @RabbitListener(queues = "queue.foo") public void receiveFooQueue(Foo foo) { System.out.println("Received Foo<" + foo.getName() + ">"); } @RabbitListener(queues = "queue.bar") public void receiveBarQueue(Bar bar) { System.out.println("Received Bar<" + bar.getAge() + ">"); }}
查看输出
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v1.3.2.RELEASE)2016-02-02 02:26:07.099 INFO 2185 --- [ main] com.demo.RabbitmqdemoApplication : Starting RabbitmqdemoApplication on Jupiter.local with PID 2185 (/Users/codecraft/workspace/rabbitmqdemo/target/classes started by codecraft in /Users/codecraft/workspace/rabbitmqdemo)2016-02-02 02:26:07.101 INFO 2185 --- [ main] com.demo.RabbitmqdemoApplication : No active profile set, falling back to default profiles: default2016-02-02 02:26:07.166 INFO 2185 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@15b3e5b: startup date [Tue Feb 02 02:26:07 CST 2016]; root of context hierarchy2016-02-02 02:26:08.004 INFO 2185 --- [ main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [class org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$d94c0656] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)2016-02-02 02:26:08.976 INFO 2185 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup2016-02-02 02:26:08.981 INFO 2185 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -21474826482016-02-02 02:26:08.981 INFO 2185 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 02016-02-02 02:26:09.029 INFO 2185 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 2147483647Received Foo<062226b0-cc27-433c-8ba9-201379f2d7c4>Received Bar<-1717186171>Received FooReceived Bar<1094787449>Received Foo Received Bar<-39441298>Received Foo<235369f1-0b59-4c4f-9a51-2277f3179798>Received Bar<-596340646>Received Foo Received Bar<-915839285>Received Foo<17fb113a-8845-473a-9f46-f850526f6f4d>Received Bar<-75651721>Received Foo Received Bar<-1210351662>Received Foo<23700d4a-26f9-4280-836d-9f6d63c0a3a0>Received Bar<-2096776841>Received Foo Received Bar<-1986644405>
查看rabbitmq界面
查看队列
这里是启动了两个实例,两个接收者,加上prefecth设置为5,所以待ack的保持在2*2*5=20
查看channel
查看连接
坑
MimeType的warning
2016-02-02 02:19:04.444 WARN 2168 --- [ main] o.s.amqp.support.SimpleAmqpHeaderMapper : skipping header 'contentType' since it is not of expected type [org.springframework.util.MimeType]
找到的出处
没有解决,目前是把amqp的log调整到error.
declare事项
为了方便自动化处理,可以在代码里头去declare queue、topic、exchange,最后注意declare binding,否则就接受不到消息。