0%

SpringBoot-整合RabbitMQ

前言

  • 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候。消息队列主要解决了应用耦合、异步处理、流量削锋等问题。
  • RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ安装

由于RabbitMQ是基于erlang的,所以,在正式安装RabbitMQ之前,需要先安装一下erlang。具体安装过程可参考windows10环境下的RabbitMQ安装步骤(图文)

pom.xml文件

这里是通过idea直接勾选得到的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

配置文件

配置rabbitMQ的地址。

1
2
3
4
5
6
7
8
server:
port: 8080
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

使用RabbitMQ

  • 简单使用

  1. 队列配置
    1
    2
    3
    4
    5
    6
    7
    8
    @Configuration
    public class RabbitConfig {

    @Bean
    public Queue helloQueue() {
    return new Queue("hello");
    }
    }
  2. 发送者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Component
    public class HelloSender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
    String context = "hello " + new Date();
    System.out.println("Sender : " + context);
    this.rabbitTemplate.convertAndSend("hello", context);
    }
    }
  3. 接收者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    @RabbitListener(queues = "hello")
    public class HelloReceiver {

    @RabbitHandler
    public void process(String hello) {
    System.out.println("Receiver : " + hello);
    }
    }
  4. controller类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @RestController
    public class TestController {

    @Autowired
    private HelloSender helloSender;

    @GetMapping("hello")
    public String helloTest() {
    helloSender.send();
    return "success";
    }
    }
  • 一对多

  1. 队列配置
    1
    2
    3
    4
    5
    6
    7
    8
    @Configuration
    public class RabbitConfig {

    @Bean
    public Queue oneQueue() {
    return new Queue("oneQueue");
    }
    }
  2. 发送者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Component
    public class OneSender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(int i) {
    String context = "spirng boot oneQueue queue"+" ****** "+i;
    System.out.println("Sender1" + context);
    this.amqpTemplate.convertAndSend("oneQueue", context);
    }
    }
  3. 接收者
    两个接收者。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    @RabbitListener(queues = "oneQueue")
    public class OneReceiver1 {

    @RabbitHandler
    public void process(String Str) {
    System.out.println("Receiver1:" + Str);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    @RabbitListener(queues = "oneQueue")
    public class OneReceiver2 {

    @RabbitHandler
    public void process(String Str) {
    System.out.println("Receiver2:" + Str);
    }
    }
  4. controller类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @RestController
    public class TestController {
    @Autowired
    private OneSender oneSender;

    @GetMapping("oneToMany")
    public String oneToManyTest() {
    for (int i = 0; i < 10; i++) {
    oneSender.send(i);
    }
    return "success";
    }
    }
  5. 总结
    一个发送者,N个接受者,经过测试会均匀的将消息发送到N个接收者中。
  • 多对多

  1. 队列配置
    1
    2
    3
    4
    5
    6
    7
    @Configuration
    public class RabbitConfig {
    @Bean
    public Queue manyQueue() {
    return new Queue("manyQueue");
    }
    }
  2. 发送者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Component
    public class ManySender1 {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(int i) {
    String context = i + "";
    System.out.println("Sender1: " + context + "--send:");
    amqpTemplate.convertAndSend("manyQueue", context);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Component
    public class ManySender2 {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(int i) {
    String context = i + "";
    System.out.println("Sender1: " + context + "--send:");
    amqpTemplate.convertAndSend("manyQueue", context);
    }
    }
  3. 接收者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    @RabbitListener(queues = "manyQueue")
    public class ManyReceiver1 {

    @RabbitHandler
    public void process(String Str) {
    System.out.println("Receiver1:" + Str);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    @RabbitListener(queues = "manyQueue")
    public class ManyReceiver2 {

    @RabbitHandler
    public void process(String Str) {
    System.out.println("Receiver2:" + Str);
    }
    }
  4. controller类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    @RestController
    public class TestController {
    @Autowired
    private ManySender1 manySender1;
    @Autowired
    private ManySender2 manySender2;

    @GetMapping("manyToMany")
    public String manyToManyTest() {
    for (int i = 0; i < 10; i++) {
    manySender1.send(i);
    manySender2.send(i);
    }
    return "success";
    }
    }
  5. 总结
    和一对多一样,接收端仍然会均匀接收到消息。
  • 对象的支持

  1. 队列配置
    1
    2
    3
    4
    5
    6
    7
    @Configuration
    public class RabbitConfig {
    @Bean
    public Queue queue3() {
    return new Queue("object_queue");
    }
    }
  2. 实体类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public class User implements Serializable {
    private String username;
    private String password;

    public User(String username, String password) {
    this.username = username;
    this.password = password;
    }

    public String getUsername() {
    return username;
    }

    public void setUsername(String username) {
    this.username = username;
    }

    public String getPassword() {
    return password;
    }

    public void setPassword(String password) {
    this.password = password;
    }

    @Override
    public String toString() {
    return "User{" +
    "username='" + username + '\'' +
    ", password='" + password + '\'' +
    '}';
    }
    }
  3. 发送者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @Component
    public class ObjectSender {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void sendUser(User user) {
    System.out.println("Send object:" + user.toString());
    this.amqpTemplate.convertAndSend("object_queue", user);
    }
    }
  4. 接收者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    @Component
    @RabbitListener(queues = "object_queue")
    public class ObjectReceiver {

    @RabbitHandler
    public void objectReceiver(User user) {

    System.out.println("Receiver object:" + user.toString());
    }
    }
  5. controller类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @RestController
    public class TestController {
    @Autowired
    private ObjectSender objectSender;

    @GetMapping("objectSender")
    public String objectSenderTest() {
    User user = new User("admin", "123456");
    objectSender.sendUser(user);
    return "success";
    }
    }
  • Topic Exchange

  1. 队列配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    @Configuration
    public class TopicRabbitConfig {

    final static String message = "topic.message";
    final static String messages = "topic.messages";

    //创建两个 Queue
    @Bean
    public Queue queueMessage() {
    return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
    return new Queue(TopicRabbitConfig.messages);
    }

    //配置 TopicExchange,指定名称为 topicExchange
    @Bean
    TopicExchange exchange() {
    return new TopicExchange("topicExchange");
    }

    //给队列绑定 exchange 和 routing_key
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
    return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
    return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
    }
  2. 发送者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    @Component
    public class TopicSender {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send1() {
    String context = "hi, i am message 1";
    System.out.println("Sender : " + context);
    amqpTemplate.convertAndSend("topicExchange", "topic.message", context);
    }

    public void send2() {
    String context = "hi, i am messages 2";
    System.out.println("Sender : " + context);
    amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
    }
    }
  3. 接收者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    @RabbitListener(queues = "topic.message")
    public class TopicReceiver1 {

    @RabbitHandler
    public void process(String message){
    System.out.println("Receiver topic.message :"+ message);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Component
    @RabbitListener(queues = "topic.messages")
    public class TopicReceiver2 {

    @RabbitHandler
    public void process(String message){
    System.out.println("Receiver topic.messages: "+ message);
    }
    }
  4. controller类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @RestController
    public class TestController {
    @Autowired
    private TopicSender topicSender;

    @GetMapping("topicSender")
    public String topicSenderTest() {
    topicSender.send1();
    // topicSender.send2();
    return "success";
    }
    }
  5. 总结
    使用 queueMessages 同时匹配两个队列,queueMessage 只匹配 “topic.message” 队列。
  • Fanout Exchange

  1. 队列配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    @Configuration
    public class FanoutRabbitConfig {

    //创建三个队列
    @Bean
    public Queue AMessage() {
    return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
    return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
    return new Queue("fanout.C");
    }

    //创建exchange,指定交换策略
    @Bean
    public FanoutExchange fanoutExchange() {
    return new FanoutExchange("fanoutExchange");
    }

    //分别给三个队列指定exchange,这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
    @Bean
    public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    public Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
    return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
    }
  2. 发送者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Component
    public class FanoutSender {

    @Autowired
    AmqpTemplate amqpTemplate;

    public void send(){
    String context = "hi, fanout msg ";
    System.out.println("Sender : " + context);
    //这里使用了A、B、C三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
    amqpTemplate.convertAndSend("fanoutExchange","", context);
    }
    }
  3. 接收者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Component
    @RabbitListener(queues = "fanout.A")
    public class FanoutReceiverA {


    @RabbitHandler
    public void process(String message){

    System.out.println("Receiver form fanout.A: "+message);

    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Component
    @RabbitListener(queues = "fanout.B")
    public class FanoutReceiverB {


    @RabbitHandler
    public void process(String message){

    System.out.println("Receiver form fanout.B: "+message);

    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    @Component
    @RabbitListener(queues = "fanout.C")
    public class FanoutReceiverC {


    @RabbitHandler
    public void process(String message){

    System.out.println("Receiver form fanout.C: "+message);

    }

    }
  4. controller类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    @RestController
    public class TestController {
    @Autowired
    private FanoutSender fanoutSender;

    @GetMapping("fanoutSender")
    public String fanoutSenderTest() {
    fanoutSender.send();
    return "success";
    }
    }
  5. 总结
    绑定到 fanout 交换机上面的队列都收到了消息。

参考内容

什么是消息队列?

消息队列之 RabbitMQ

消息队列及常见消息队列介绍

欢迎关注我的其它发布渠道