springboot +docker + rocketmq安装与简单使用
2025-02-11 08:31 阅读(118)

一:安装rocketmq

docker pull apache/rocketmq:5.3.1

2. 下载控制台镜像

docker pull pangliang/rocketmq-console-ng

3. 编写docker-compose.yml

version: '3.8'  # 定义 Docker Compose 文件的版本,这里使用的是 3.8 版本

services:  # 定义服务的部分,包含所有要启动的 Docker 容器
  # RocketMQ Nameserver 服务
  namesrv:
    image: apache/rocketmq:5.3.1  # 指定使用的 Docker 镜像,这里使用 Apache RocketMQ 5.3.1 版本
    container_name: rmqnamesrv  # 设置容器的名称为 rmqnamesrv
    ports:
      - 9876:9876  # 映射容器的 9876 端口到宿主机的 9876 端口,RocketMQ Nameserver 的默认端口
    networks:
      - rocketmq  # 指定此服务运行在 rocketmq 网络中
    command: sh mqnamesrv  # 指定容器启动时执行的命令,这里是启动 Nameserver 服务

  # RocketMQ Broker 服务
  broker:
    image: apache/rocketmq:5.3.1  # 使用相同版本的 RocketMQ 镜像
    container_name: rmqbroker  # 设置容器名称为 rmqbroker
    ports:
      - 10909:10909  # 映射容器的 10909 端口到宿主机的 10909 端口,RocketMQ Broker 的默认端口
      - 10911:10911  # 映射容器的 10911 端口到宿主机的 10911 端口,RocketMQ Broker 监听的另一端口
    environment:
      - NAMESRV_ADDR=namesrv:9876  # 设置 Nameserver 的地址,用于连接 Nameserver
      - JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Duser.home=/home/rocketmq  # 设置 Java 相关的 JVM 参数,限制最小和最大堆内存为 512MB
    volumes:
      - ./broker.conf:/home/rocketmq/rocketmq-5.3.1/conf/broker.conf  # 将宿主机的 broker.conf 配置文件挂载到容器中,注意版本一定要对-5.3.1
    depends_on:
      - namesrv  # 确保 Nameserver 容器在 Broker 容器之前启动
    networks:
      - rocketmq  # 此服务也运行在 rocketmq 网络中
    command: sh mqbroker -c ../conf/broker.conf  # 启动 Broker 服务,指定配置文件路径

  # RocketMQ Console Web 管理界面服务
  console:
    image: pangliang/rocketmq-console-ng  # 使用第三方镜像实现 RocketMQ 控制台
    container_name: rmqconsole  # 设置容器名称为 rmqconsole
    ports:
      - 9080:8080  # 映射容器的 8080 端口到宿主机的 9080 端口,控制台的 Web UI 访问端口
    environment:
      - JAVA_OPTS=-Dserver.port=8080 -Drocketmq.config.namesrvAddr=namesrv:9876  # 设置 JAVA_OPTS,控制台监听 8080 端口,并连接 Nameserver 服务
    depends_on:
      - namesrv  # 确保 Nameserver 容器在 Console 容器之前启动
    networks:
      - rocketmq  # 此服务也运行在 rocketmq 网络中
    restart: always  # 容器退出后会自动重启
    deploy:
      resources:
        limits:
          cpus: '0.3'  # 限制该服务最多使用 0.3 个 CPU 核心
          memory: 256M  # 限制该服务最多使用 256MB 内存

# 定义一个名为 rocketmq 的网络
networks:
  rocketmq:
    driver: bridge  # 使用 Docker 的桥接网络模式
    ipam:
      config:
        - subnet: 172.28.0.0/16  # 设置网络的子网为 172.28.0.0/16,确保容器之间能在同一网络内通信

RocketMQ Nameserver (namesrv) 是集群中的协调者,用来管理 Broker。



RocketMQ Broker (broker) 是消息传递的核心,负责存储和处理消息。



RocketMQ Console (console) 是一个管理界面,允许你查看 RocketMQ 集群的状态和监控消息传递。



4. 配置文件broker.conf编写

# 集群配置
brokerClusterName=DefaultCluster  # 设置 Broker 所在的集群名称。集群中的所有 Broker 实例应该有相同的 `brokerClusterName`,默认为 `DefaultCluster`。
brokerName=Broker-A  # 设置 Broker 的名称。每个 Broker 在集群中的名字应该唯一。这里命名为 `Broker-A`,可以用于区分不同的 Broker 实例。
brokerId=0  # 设置 Broker 的 ID,标识一个特定的 Broker 实例。`0` 表示主 Broker(Master)。如果是从 Broker,ID 应该为负数,比如 `-1`。

# 存储配置
deleteWhen=04  # 设置消息存储日志文件的删除时间。`04` 表示每天 04:00 清理过期的消息文件。可以设置为 `04`、`08` 等等。
fileReservedTime=48  # 设置消息文件的保留时间,单位为小时。这里设置为 48 小时,表示消息存储文件会保留 48 小时,之后会被删除。
brokerRole=ASYNC_MASTER  # 设置 Broker 的角色。`ASYNC_MASTER` 表示该 Broker 为异步主节点,所有写操作都会异步地提交到磁盘。
# 如果是 `SYNC_MASTER`,则表示同步主节点,会等待所有数据写入磁盘后才返回成功;如果是 `SLAVE`,表示从节点。
flushDiskType=ASYNC_FLUSH  # 设置刷盘的方式。`ASYNC_FLUSH` 表示异步刷盘,写数据时不等待磁盘刷写完成后再返回,性能更好,但可能存在数据丢失风险。
# 另外一个选项是 `SYNC_FLUSH`,表示同步刷盘,数据写入时会等待磁盘刷写完成,保证数据可靠性,但性能较差。

# 网络配置
listenPort=10911  # 设置 Broker 监听的端口号,客户端和其他 Broker 会通过此端口与该 Broker 进行通信。
brokerIP1=100.72.47.219  # 设置 Broker 对外暴露的 IP 地址。客户端和其他 Broker 会通过这个 IP 地址来访问此 Broker。
# 这个地址一般设置为宿主机的 IP 地址或者外部可访问的地址。确保在多个 Broker 之间互相访问时能够正常连接。
namesrvAddr=100.72.47.219:9876  # 设置 NameServer 的地址。NameServer 是 RocketMQ 的注册中心,Broker 会注册到 NameServer 上,客户端通过它来找到 Broker。
# `100.72.47.219:9876` 表示 NameServer 的 IP 地址和端口号。这里的 IP 地址应为外部可访问的地址,确保其他 Broker 和客户端能够访问到 NameServer。

5.启动

 docker-compose up -d 

6.进入控制台

http://100.72.47.219:9080/#/ 这里我已经部署使用过了所以里面有数据,第一次是空的

二:spring boot集成

1. 引入依赖

<!-- 根据需要选择最新版本,从中央仓库可以查看https://central.sonatype.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.3.0</version>  
</dependency>

2.配置application.yml参数

# 注意:rocketmq必须在根节点也就是rocketmq.name-server=*****
rocketmq:
  name-server: 100.72.47.219:9876  # RocketMQ NameServer 地址
  producer:
    group: other-producer-group  # 生产者组
    send-message-timeout: 10000
  consumer:
    group: other-consumer-group  # 消费者组
    send-message-timeout: 10000

3. 消费者service类

import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
 * messageModel=MessageModel.CLUSTERING
 * 监听模式,有消息就会消费
 */
@Service
@RocketMQMessageListener(topic = "test2-topic", consumerGroup = "other-consumer-group", messageModel = MessageModel.CLUSTERING)
public class RocketMQConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String s) {
        System.out.printf("收到消息: %s\n", s);
    }
}

4. 生产者service类

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class RocketMQProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    private final String topic = "test2-topic";
    // 1.同步发送消息
    // 同步发送是指发送方发送一条消息后,会等待服务器返回确认信息后再进行后续操作。这种方式适用于需要可靠性保证的场景。
    public void createAndSend(String message){
        rocketMQTemplate.convertAndSend(topic, message);
        System.out.printf("同步发送结果: %s\n", message);
    }
    // 1.同步发送消息
    // 同步发送是指发送方发送一条消息后,会等待服务器返回确认信息后再进行后续操作。这种方式适用于需要可靠性保证的场景。
    public void sendSyncMessage(String message){
        SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build());
        System.out.println(sendResult.getMsgId());
        System.out.printf("同步发送结果: %s\n", message);
    }

    // 2.异步发送消息
    // 异步发送是指发送方发送消息后,不等待服务器返回确认信息,而是通过回调接口处理返回结果。这种方式适用于对响应时间要求较高的场景。
    public void sendAsyncMessage(String message){
        rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("异步发送成功: %s\n", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.printf("异步发送失败: %s\n", throwable.getMessage());
            }
        });
    }

    // 3.单向发送消息
    // 单向发送是指发送方只负责发送消息,不关心服务器的响应。该方式适用于对可靠性要求不高的场景,如日志收集。
    public void sendOneWayMessage(String message){
        rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build());
        System.out.println("单向消息发送成功");
    }
}

5.测试controller类

@RequestMapping("api")
@RestController
public class RocketController {
    @Autowired
    private RocketMQProducer rocketMQProducer;

    @GetMapping("/createAndSend")
    public String createAndSend(@RequestParam String message) {
        rocketMQProducer.createAndSend(message);
        return "同步消息发送成功";
    }

    @GetMapping("/sendSync")
    public String sendSync(@RequestParam String message) {
        rocketMQProducer.sendSyncMessage(message);
        return "同步消息发送成功";
    }

    @GetMapping("/sendAsync")
    public String sendAsync(@RequestParam String message) {
        rocketMQProducer.sendAsyncMessage(message);
        return "异步消息发送中";
    }

    @GetMapping("/sendOneWay")
    public String sendOneWay(@RequestParam String message) {
        rocketMQProducer.sendOneWayMessage(message);
        return "单向消息发送成功";
    }
}

6.postman测试

https://www.zuocode.com