RabbitMQ的使用

RabbitMQ

同步调用方式和异步调用方式

同步调用

微服务间基于 Feign 的调用就属于同步调用方式

缺点:

  • 耦合度高,每次加入新的需求都要修改原来的代码
  • 性能下降,调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和
  • 资源浪费,调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
  • 级联失败,如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务集群故障

异步调用

异步调用常见实现就是事件驱动模式

优点:

  • 服务解耦,异步方式下,服务只发布事件,而不调用其它服务。如果有新的业务需求出现,只需要去订阅该事件主题。
  • 性能提高,吞吐量提升
  • 服务没有强依赖关系
  • 流量消峰

缺点:

  • 依赖于 Broker(消息队列) 的可靠性、安全性、吞吐能力
  • 架构复杂,业务没有明显的流程线,不方便问题排查和追踪管理

RabbitMQ 的安装部署

  1. 启动docker容器 systemctl start docker

  2. 下载rabbitmq镜像 docker pull rabbitmq:3-management

  3. 安装mq并配置用户信息

     docker run \
         -e RABBITMQ_DEFAULT_USER=root \
         -e RABBITMQ_DEFAULT_PASS=root \
         --name mq \
         --hostname hadoop001 \
         -p 15672:15672 \
         -p 5672:5672 \
         -d \
         rabbitmq:3-management

SpringAMQP

  1. 引入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. 配置连接信息

    spring.rabbitmq.host=hadoop001
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=root
    spring.rabbitmq.password=root

HelloWorld 案例

简单队列的使用

消息发送者

@SpringBootTest
class SpringAmqpApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void publisherTest() {
        String queueName = "mq";
        String message = "Hello, Spring AMQP!";
        rabbitTemplate.convertAndSend(queueName, message);
    }

}

消息消费者

@Component
public class Consumer {

    /**
     * @param message 发送的消息类型是字符串, 则接收的消息类型也是字符串
     */
    @RabbitListener(queues = "mq")
    public void listen(String message) {
        System.out.println("consumer receive message = " + message);
    }
}

工作队列 Work Queues

工作队列的使用

进行案例演示测试时,先启动消费者(即SpringApplication),再启动消息发送者(SpringBootTest)。因为 SpringBootTest 框架也会扫描 @Component,所在在调用测试方法发送消息时,就已经有一部分消息被消费了。但是由于 @Test 方法的生命周期结束得更早,因此往往只会输出几条消息。产生如下图所示的场景:

SpringBoot Test 框架中向 RbbitMQ 发送消息

消息发送者


@SpringBootTest
class SpringAmqpApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    @SneakyThrows
    public void workQueuePublisherTest() {
        String queueName = "mq";
        String message = LocalTime.now() + " : Hello, Spring AMQP ";
        int counts = 50;
        for (int i = 0; i < counts; i++) {
            rabbitTemplate.convertAndSend(queueName, (message + i));
        }
    }
}

消息消费者

@Component
public class WorkQueueConsumer {

    @SneakyThrows
    @RabbitListener(queues = "mq")
    public void listenByA(String message) {
        System.out.println("consumerA : " + message);
        Thread.sleep(20);
    }


    @SneakyThrows
    @RabbitListener(queues = "mq")
    public void listenByB(String message) {
        System.err.println("consumerB : " + message);
        Thread.sleep(200);
    }
}

发布-订阅模式 Publish-Subscribe

发布-订阅模式

发布-订阅模式通过交换机来保存数据信息,发送到所有绑定的队列中,从而避免消息只能被一个消费者消费。

Fanout Exchange 交换机

发送到与 Fanout Exchange 交换机绑定的所有队列


   转载规则


《RabbitMQ的使用》 熊水斌 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录