RabbitMQ
同步调用方式和异步调用方式
同步调用
微服务间基于 Feign 的调用就属于同步调用方式
缺点:
- 耦合度高,每次加入新的需求都要修改原来的代码
- 性能下降,调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和
- 资源浪费,调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源
- 级联失败,如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务集群故障
异步调用
异步调用常见实现就是事件驱动模式
优点:
- 服务解耦,异步方式下,服务只发布事件,而不调用其它服务。如果有新的业务需求出现,只需要去订阅该事件主题。
- 性能提高,吞吐量提升
- 服务没有强依赖关系
- 流量消峰
缺点:
- 依赖于 Broker(消息队列) 的可靠性、安全性、吞吐能力
- 架构复杂,业务没有明显的流程线,不方便问题排查和追踪管理
RabbitMQ 的安装部署
启动docker容器
systemctl start docker
下载rabbitmq镜像
docker pull rabbitmq:3-management
安装mq并配置用户信息
docker run \ -e RABBITMQ_DEFAULT_USER=root \ -e RABBITMQ_DEFAULT_PASS=root \ --name mq \ --hostname hadoop001 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
SpringAMQP
引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置连接信息
spring.rabbitmq.host=hadoop001 spring.rabbitmq.port=5672 spring.rabbitmq.username=root spring.rabbitmq.password=root
HelloWorld 案例
消息发送者
@SpringBootTest
class SpringAmqpApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void publisherTest() {
String queueName = "mq";
String message = "Hello, Spring AMQP!";
rabbitTemplate.convertAndSend(queueName, message);
}
}
消息消费者
@Component
public class Consumer {
/**
* @param message 发送的消息类型是字符串, 则接收的消息类型也是字符串
*/
@RabbitListener(queues = "mq")
public void listen(String message) {
System.out.println("consumer receive message = " + message);
}
}
工作队列 Work Queues
进行案例演示测试时,先启动消费者(即SpringApplication),再启动消息发送者(SpringBootTest)。因为 SpringBootTest 框架也会扫描 @Component,所在在调用测试方法发送消息时,就已经有一部分消息被消费了。但是由于 @Test 方法的生命周期结束得更早,因此往往只会输出几条消息。产生如下图所示的场景:
消息发送者
@SpringBootTest
class SpringAmqpApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
@SneakyThrows
public void workQueuePublisherTest() {
String queueName = "mq";
String message = LocalTime.now() + " : Hello, Spring AMQP ";
int counts = 50;
for (int i = 0; i < counts; i++) {
rabbitTemplate.convertAndSend(queueName, (message + i));
}
}
}
消息消费者
@Component
public class WorkQueueConsumer {
@SneakyThrows
@RabbitListener(queues = "mq")
public void listenByA(String message) {
System.out.println("consumerA : " + message);
Thread.sleep(20);
}
@SneakyThrows
@RabbitListener(queues = "mq")
public void listenByB(String message) {
System.err.println("consumerB : " + message);
Thread.sleep(200);
}
}
发布-订阅模式 Publish-Subscribe
发布-订阅模式通过交换机来保存数据信息,发送到所有绑定的队列中,从而避免消息只能被一个消费者消费。
Fanout Exchange 交换机
发送到与 Fanout Exchange 交换机绑定的所有队列