博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringBoot应用之消息队列
阅读量:5942 次
发布时间:2019-06-19

本文共 8285 字,大约阅读时间需要 27 分钟。

SpringBoot应用系列文章

本文主要讲如何在Spring Boot里头使用rabbitmq进行消息收发。

准备rabbitmq集群

具体查看这篇文章。

新建项目

图片描述

配置项

#http://segmentfault.com/a/1190000004309900spring.rabbitmq.host=192.168.99.100spring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/logging.level.org.springframework.amqp=ERRORlogging.level.com.demo=INFO#spring.rabbitmq.dynamic 是否创建AmqpAdmin bean. 默认为: true#spring.rabbitmq.listener.acknowledge-mode 指定Acknowledge的模式.#spring.rabbitmq.listener.auto-startup 是否在启动时就启动mq,默认: true#spring.rabbitmq.listener.concurrency 指定最小的消费者数量.#spring.rabbitmq.listener.max-concurrency 指定最大的消费者数量.#spring.rabbitmq.listener.prefetch 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.#spring.rabbitmq.listener.transaction-size 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.#spring.rabbitmq.requested-heartbeat 指定心跳超时,0为不指定.#spring.rabbitmq.ssl.enabled 是否开始SSL,默认: false#spring.rabbitmq.ssl.key-store 指定持有SSL certificate的key store的路径#spring.rabbitmq.ssl.key-store-password 指定访问key store的密码.#spring.rabbitmq.ssl.trust-store 指定持有SSL certificates的Trust store.#spring.rabbitmq.ssl.trust-store-password 指定访问trust store的密码.

生产者配置

@Configurationpublic class ProducerConfig {    @Bean    RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {        return new RabbitAdmin(connectionFactory);    }    @Bean    Queue queueFoo(RabbitAdmin rabbitAdmin) {        Queue queue = new Queue("queue.foo", true);        rabbitAdmin.declareQueue(queue);        return queue;    }    @Bean    Queue queueBar(RabbitAdmin rabbitAdmin) {        Queue queue = new Queue("queue.bar", true);        rabbitAdmin.declareQueue(queue);        return queue;    }    @Bean    TopicExchange exchange(RabbitAdmin rabbitAdmin) {        TopicExchange topicExchange = new TopicExchange("exchange");        rabbitAdmin.declareExchange(topicExchange);        return topicExchange;    }    @Bean    Binding bindingExchangeFoo(Queue queueFoo, TopicExchange exchange,RabbitAdmin rabbitAdmin) {        Binding binding = BindingBuilder.bind(queueFoo).to(exchange).with("queue.foo");        rabbitAdmin.declareBinding(binding);        return binding;    }    @Bean    Binding bindingExchangeBar(Queue queueBar, TopicExchange exchange,RabbitAdmin rabbitAdmin) {        Binding binding = BindingBuilder.bind(queueBar).to(exchange).with("queue.bar");        rabbitAdmin.declareBinding(binding);        return binding;    }    /**     * 生产者用     * @return     */    @Bean    public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) {        RabbitMessagingTemplate rabbitMessagingTemplate = new RabbitMessagingTemplate();        rabbitMessagingTemplate.setMessageConverter(jackson2Converter());        rabbitMessagingTemplate.setRabbitTemplate(rabbitTemplate);        return rabbitMessagingTemplate;    }    @Bean    public MappingJackson2MessageConverter jackson2Converter() {        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();        return converter;    }}

消费者配置

@Configuration@EnableRabbitpublic class ConsumerConfig implements RabbitListenerConfigurer {    @Autowired    ReceiverService receiverService;    @Bean    public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();        factory.setMessageConverter(new MappingJackson2MessageConverter());        return factory;    }    @Bean    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();        factory.setConnectionFactory(connectionFactory);        // factory.setPrefetchCount(5);         factory.setAcknowledgeMode(AcknowledgeMode.AUTO);        return factory;    }    @Override    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {        registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());    }}

消息收发

发送服务

@Componentpublic class SenderService {    @Autowired    private RabbitMessagingTemplate rabbitMessagingTemplate;    public void sendFoo2Rabbitmq(final Foo foo) {        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue.foo", foo);    }    public void sendBar2Rabbitmq(final Bar bar){        this.rabbitMessagingTemplate.convertAndSend("exchange", "queue.bar", bar);    }}

调用

@SpringBootApplication@ComponentScan(basePackages = "com.demo")public class RabbitmqdemoApplication implements CommandLineRunner {    public static void main(String[] args) {        SpringApplication.run(RabbitmqdemoApplication.class, args);    }    @Autowired    SenderService senderService;    @Override    public void run(String... strings) throws Exception {        Random random = new Random();        while (true){            senderService.sendBar2Rabbitmq(new Bar(random.nextInt()));            senderService.sendFoo2Rabbitmq(new Foo(UUID.randomUUID().toString()));        }    }}

接收

@Componentpublic class ReceiverService {    @RabbitListener(queues = "queue.foo")    public void receiveFooQueue(Foo foo) {        System.out.println("Received Foo<" + foo.getName() + ">");    }    @RabbitListener(queues = "queue.bar")    public void receiveBarQueue(Bar bar) {        System.out.println("Received Bar<" + bar.getAge() + ">");    }}

查看输出

.   ____          _            __ _ _ /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/  ___)| |_)| | | | | || (_| |  ) ) ) )  '  |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot ::        (v1.3.2.RELEASE)2016-02-02 02:26:07.099  INFO 2185 --- [           main] com.demo.RabbitmqdemoApplication   : Starting RabbitmqdemoApplication on Jupiter.local with PID 2185 (/Users/codecraft/workspace/rabbitmqdemo/target/classes started by codecraft in /Users/codecraft/workspace/rabbitmqdemo)2016-02-02 02:26:07.101  INFO 2185 --- [           main] com.demo.RabbitmqdemoApplication   : No active profile set, falling back to default profiles: default2016-02-02 02:26:07.166  INFO 2185 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@15b3e5b: startup date [Tue Feb 02 02:26:07 CST 2016]; root of context hierarchy2016-02-02 02:26:08.004  INFO 2185 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration' of type [class org.springframework.amqp.rabbit.annotation.RabbitBootstrapConfiguration$$EnhancerBySpringCGLIB$$d94c0656] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)2016-02-02 02:26:08.976  INFO 2185 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup2016-02-02 02:26:08.981  INFO 2185 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase -21474826482016-02-02 02:26:08.981  INFO 2185 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 02016-02-02 02:26:09.029  INFO 2185 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647Received Foo<062226b0-cc27-433c-8ba9-201379f2d7c4>Received Bar<-1717186171>Received Foo
Received Bar<1094787449>Received Foo
Received Bar<-39441298>Received Foo<235369f1-0b59-4c4f-9a51-2277f3179798>Received Bar<-596340646>Received Foo
Received Bar<-915839285>Received Foo<17fb113a-8845-473a-9f46-f850526f6f4d>Received Bar<-75651721>Received Foo
Received Bar<-1210351662>Received Foo<23700d4a-26f9-4280-836d-9f6d63c0a3a0>Received Bar<-2096776841>Received Foo
Received Bar<-1986644405>

查看rabbitmq界面

图片描述

查看队列

图片描述

这里是启动了两个实例,两个接收者,加上prefecth设置为5,所以待ack的保持在2*2*5=20
图片描述

查看channel

图片描述

查看连接

图片描述

MimeType的warning

2016-02-02 02:19:04.444  WARN 2168 --- [           main] o.s.amqp.support.SimpleAmqpHeaderMapper  : skipping header 'contentType' since it is not of expected type [org.springframework.util.MimeType]

找到的出处

没有解决,目前是把amqp的log调整到error.

declare事项

为了方便自动化处理,可以在代码里头去declare queue、topic、exchange,最后注意declare binding,否则就接受不到消息。

参考

转载地址:http://psqtx.baihongyu.com/

你可能感兴趣的文章
关于缓存命中率的几个关键问题!
查看>>
崛起的中国服务器市场迎来旺盛的SPEC测试需求
查看>>
视频转成flv格式
查看>>
客服运营三部曲
查看>>
《Python硬件编程实战》——2.8 在Mac中安装Python
查看>>
《iOS应用开发指南——使用HTML5、CSS3和JavaScript》——导读
查看>>
小测一下fastjson的速度(纯娱乐)
查看>>
如何做618数据复盘?你需要掌握这8大思路
查看>>
《ANSYS FLUENT 16.0超级学习手册》——2.5 FLUENT 16.0的基本操作
查看>>
深入理解Spark:核心思想与源码分析. 3.9 启动测量系统MetricsSystem
查看>>
讲给普通人听的分布式数据存储
查看>>
《C++面向对象高效编程(第2版)》——3.13 采用语义
查看>>
《 短文本数据理解》——2.5小结
查看>>
如何编写一个全新的 Git 协议
查看>>
马云携阿里17位创始人及合伙人捐赠浙大一院5.6亿,杭州渐成中国硅谷
查看>>
《libGDX移动游戏开发从入门到精通》一第2章 libGDX的架构分析
查看>>
《配置管理最佳实践》——2.10 建立构建过程
查看>>
《C++入门经典(第5版•修订版)》——2.6 问与答
查看>>
PLM调研第二天
查看>>
《精通Linux设备驱动程序开发》——1.5 Linux发行版
查看>>