RabbitMQ前面已经介绍过原理以及工作机制,现在我们利用SpringBoot进行消息队列的操作。
创建RabbitMQ环境
同样的,为了测试方便,我们在这里直接使用docker部署RabbitMQ,使用下面的命令在docker hub中查找RabbitMQ的版本。
为了便于直观看到RabbitMQ中的情况,我们使用带”-management”后缀的版本,这种版本自带web管理界面,能更好的帮助我们理解RabbitMQ的执行流程。
1
| docker pull rabbitmq:3-management
|
下载完成后查看镜像的下载情况,并进行部署启动。
1 2
| docker images docker run -d -p 5672:5672 -p 15672:15672 --name RabbitMQ rabbitmq:3-management
|
这边指定的两个端口,第一个端口是RabbitMQ自身的启动端口,第二个端口号是web管理后台的端口号,在启动的时候指定,我们就能直接利用这个端口号访问后台页面,如下图所示。
点击Exchanges查看所有的交换器情况。
点击Queues查看所有的队列的情况。
使用SpringBoot操作RabbitMQ
创建测试工程,引入相关依赖
在SpringBoot2.0版本中,我们直接引入amqp的依赖就可以进行与RabbitMQ的整合,因为RabbitMQ正是属于AMQP类型的消息队列。
1 2 3 4 5 6 7 8 9 10 11 12
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency>
|
然后在application.properties配置文件中配置RabbitMQ的相关信息。
1 2 3 4 5
| # RabbitMQ配置信息 spring.rabbitmq.host=服务器地址 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.port=5672
|
测试direct点对点模式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
@Test void contextLoads() {
Map<String, Object> map = new HashMap<>(); map.put("msg","这是第一个单播的消息"); map.put("data", Arrays.asList("test",123,true)); rabbitTemplate.convertAndSend("exchange.direct","atguigu.news", JSON.toJSONString(new Book("三国演义","罗贯中"))); }
|
在这里我们使用fastjson将具体的POJO对象转换成json字符串发送到消息队列中进行保存,而在接收的时候,则需要使用fastjson将字符串反序列化为原POJO对象。我们编写一个service进行消息队列监听的操作,使用@RabbitListener注解指定监听哪一个队列接收到消息,一但被我们写的service接收,该条消息即从消息队列中删除。需要提醒的是,如果要开启监听功能,需要在启动类上加上@EnableRabbit注解。
1 2 3 4 5 6 7 8 9
| @EnableRabbit @SpringBootApplication public class SpringbootAmqpApplication {
public static void main(String[] args) { SpringApplication.run(SpringbootAmqpApplication.class, args); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Service("bookService") public class BookServiceImpl implements BookService {
private static final Logger logger = LoggerFactory.getLogger(BookServiceImpl.class);
@Override @RabbitListener(queues = "atguigu.news") public void receive(String book) { logger.info("-------------------接收消息-------------------"); Book book1 = JSONObject.parseObject(book, Book.class); System.out.println(book1); }
@RabbitListener(queues = "atguigu") public void receive(Message message){ System.out.println(message.getBody()); System.out.println(message.getMessageProperties()); } }
|
广播消息
通过指定交换器为fanout类型的,进行发送广播消息,即所有的队列均可以接收到发送的消息。
1 2 3 4 5 6 7 8 9
|
@Test void broadcast(){ Map map = new HashMap(); map.put("msg","这是广播的消息"); rabbitTemplate.convertAndSend("exchange.fanout","",map); }
|
操作创建队列、交换器、绑定规则
创建队列、交换器、绑定规则的前提需要引入AmqpAdmin,RabbitMQ的系统功能管理组件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @Autowired private AmqpAdmin amqpAdmin;
@Test void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("amqpAdmin.exchange"));
amqpAdmin.declareQueue(new Queue("amqpAdmin.queue")); logger.warn("---------------------创建完成--------------------");
amqpAdmin.declareBinding(new Binding("amqpAdmin.queue",Binding.DestinationType.QUEUE,"amqpAdmin.exchange","amqp.binding",null)); }
|
总结
以上就是SpringBoot简单整合RabbitMQ的流程,在这边总结的目的也就是为了提高开发的效率,不需要再一步一步去找文档怎样进行配置,而着重关注业务逻辑。