复习消息队列之RabbitMQ

子望 / 2023-08-09 / 原文

概念:

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
对比:

RabbitMQ对比Kafka  mq来说更安全可靠 但大数据处理来说kafka更适合IO高吞吐的处理

安装部署(基于linux):

https://blog.csdn.net/kuaixiao0217/article/details/128593503

 

基于springboot实现MQ

引入依赖pom.xml 配置文件application.yaml

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  application:
    name: springboot_rabbitmq
  rabbitmq:
    host: 10.15.0.9
    port: 5672
    username: ems
    password: 123
    virtual-host: /ems

第一种模式 直连

  生产者

package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DemoApplicationTests {


    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("hello","hello world"); 
// 生产端没有指定交换机只有routingKey和Object。
//消费方产生hello队列,放在默认的交换机(AMQP default)上。
//而默认的交换机有一个特点,只要你的routerKey的名字与这个
//交换机的队列有相同的名字,他就会自动路由上。 
//生产端routingKey 叫hello ,消费端生产hello队列。
//他们就路由上了
    }

}

  消费者

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
// 生产端没有指定交换机只有routingKey和Object。
//消费方产生hello队列,放在默认的交换机(AMQP default)上。
//而默认的交换机有一个特点,只要你的routerKey的名字与这个
//交换机的队列有相同的名字,他就会自动路由上。 
//生产端routingKey 叫hello ,消费端生产hello队列。
//他们就路由上了
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class HelloCustomer {

    @RabbitHandler
    public void receive1(String message){
        System.out.println("message = " + message);
    }
}

第二种模式 work模型

  生产者

package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DemoApplicationTests {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work","hello work!"); 
            // 生产端没有指定交换机只有routingKey和Object。
            //消费方产生work队列,放在默认的交换机(AMQP default)上。
            //而默认的交换机有一个特点,只要你的routerKey的名字与这个
            //交换机的队列有相同的名字,他就会自动路由上。 
            //生产端routingKey 叫work ,消费端生产work队列。
            //他们就路由上了                 
        }
    }
}

  消费者

package com.example;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkCustomer {
    // 生产端没有指定交换机只有routingKey和Object。
    //消费方产生work队列,放在默认的交换机(AMQP default)上。
    //而默认的交换机有一个特点,只要你的routerKey的名字与这个
    //交换机的队列有相同的名字,他就会自动路由上。 
    //生产端routingKey 叫work ,消费端生产work队列。
    //他们就路由上了                 
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message){
        System.out.println("work message1 = " + message);
    }

    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message){
        System.out.println("work message2 = " + message);
    }
}

 Fanout 广播模型

生产者

package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DemoApplicationTests {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("logs","","这是日志广播"); // 参数1为交换机,参数2为路由key,“”表示为任意路由,参数3为消息内容
    }
}

消费者

package com.example;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkCustomer {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, // 创建临时队列
            exchange = @Exchange(name = "logs", type = "fanout")
    ))
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue, //创建临时队列
            exchange = @Exchange(name = "logs", type = "fanout")  //绑定交换机类型
    ))
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}

Route 路由模型

生产者

package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DemoApplicationTests {


    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("directs","error","error 的日志信息");
    }

}

消费者

package com.example;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue, // 创建临时队列
                    key = {"info", "error"}, // 路由key
                    exchange = @Exchange(type = "direct", name = "directs")
            )})
    public void receive1(String message) {
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    key = {"error"},
                    exchange = @Exchange(type = "direct", name = "directs")
            )})
    public void receive2(String message) {
        System.out.println("message2 = " + message);
    }
}

Topic 订阅模型(动态路由模型)

生产者

package com.example;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class DemoApplicationTests {


    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() {
        rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll 的消息");
    }
}

消费者

package com.example;

import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class WorkCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    key = {"user.*"},
                    exchange = @Exchange(type = "topic",name = "topics")
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }

    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    key = {"user.#"},
                    exchange = @Exchange(type = "topic",name = "topics")
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}