一:安装rocketmq
docker pull apache/rocketmq:5.3.1
2. 下载控制台镜像
docker pull pangliang/rocketmq-console-ng
3. 编写docker-compose.yml
version: '3.8' # 定义 Docker Compose 文件的版本,这里使用的是 3.8 版本
services: # 定义服务的部分,包含所有要启动的 Docker 容器
# RocketMQ Nameserver 服务
namesrv:
image: apache/rocketmq:5.3.1 # 指定使用的 Docker 镜像,这里使用 Apache RocketMQ 5.3.1 版本
container_name: rmqnamesrv # 设置容器的名称为 rmqnamesrv
ports:
- 9876:9876 # 映射容器的 9876 端口到宿主机的 9876 端口,RocketMQ Nameserver 的默认端口
networks:
- rocketmq # 指定此服务运行在 rocketmq 网络中
command: sh mqnamesrv # 指定容器启动时执行的命令,这里是启动 Nameserver 服务
# RocketMQ Broker 服务
broker:
image: apache/rocketmq:5.3.1 # 使用相同版本的 RocketMQ 镜像
container_name: rmqbroker # 设置容器名称为 rmqbroker
ports:
- 10909:10909 # 映射容器的 10909 端口到宿主机的 10909 端口,RocketMQ Broker 的默认端口
- 10911:10911 # 映射容器的 10911 端口到宿主机的 10911 端口,RocketMQ Broker 监听的另一端口
environment:
- NAMESRV_ADDR=namesrv:9876 # 设置 Nameserver 的地址,用于连接 Nameserver
- JAVA_OPT_EXT=-server -Xms512m -Xmx512m -Duser.home=/home/rocketmq # 设置 Java 相关的 JVM 参数,限制最小和最大堆内存为 512MB
volumes:
- ./broker.conf:/home/rocketmq/rocketmq-5.3.1/conf/broker.conf # 将宿主机的 broker.conf 配置文件挂载到容器中,注意版本一定要对-5.3.1
depends_on:
- namesrv # 确保 Nameserver 容器在 Broker 容器之前启动
networks:
- rocketmq # 此服务也运行在 rocketmq 网络中
command: sh mqbroker -c ../conf/broker.conf # 启动 Broker 服务,指定配置文件路径
# RocketMQ Console Web 管理界面服务
console:
image: pangliang/rocketmq-console-ng # 使用第三方镜像实现 RocketMQ 控制台
container_name: rmqconsole # 设置容器名称为 rmqconsole
ports:
- 9080:8080 # 映射容器的 8080 端口到宿主机的 9080 端口,控制台的 Web UI 访问端口
environment:
- JAVA_OPTS=-Dserver.port=8080 -Drocketmq.config.namesrvAddr=namesrv:9876 # 设置 JAVA_OPTS,控制台监听 8080 端口,并连接 Nameserver 服务
depends_on:
- namesrv # 确保 Nameserver 容器在 Console 容器之前启动
networks:
- rocketmq # 此服务也运行在 rocketmq 网络中
restart: always # 容器退出后会自动重启
deploy:
resources:
limits:
cpus: '0.3' # 限制该服务最多使用 0.3 个 CPU 核心
memory: 256M # 限制该服务最多使用 256MB 内存
# 定义一个名为 rocketmq 的网络
networks:
rocketmq:
driver: bridge # 使用 Docker 的桥接网络模式
ipam:
config:
- subnet: 172.28.0.0/16 # 设置网络的子网为 172.28.0.0/16,确保容器之间能在同一网络内通信
RocketMQ Nameserver (namesrv) 是集群中的协调者,用来管理 Broker。
RocketMQ Broker (broker) 是消息传递的核心,负责存储和处理消息。
RocketMQ Console (console) 是一个管理界面,允许你查看 RocketMQ 集群的状态和监控消息传递。
4. 配置文件broker.conf编写
# 集群配置
brokerClusterName=DefaultCluster # 设置 Broker 所在的集群名称。集群中的所有 Broker 实例应该有相同的 `brokerClusterName`,默认为 `DefaultCluster`。
brokerName=Broker-A # 设置 Broker 的名称。每个 Broker 在集群中的名字应该唯一。这里命名为 `Broker-A`,可以用于区分不同的 Broker 实例。
brokerId=0 # 设置 Broker 的 ID,标识一个特定的 Broker 实例。`0` 表示主 Broker(Master)。如果是从 Broker,ID 应该为负数,比如 `-1`。
# 存储配置
deleteWhen=04 # 设置消息存储日志文件的删除时间。`04` 表示每天 04:00 清理过期的消息文件。可以设置为 `04`、`08` 等等。
fileReservedTime=48 # 设置消息文件的保留时间,单位为小时。这里设置为 48 小时,表示消息存储文件会保留 48 小时,之后会被删除。
brokerRole=ASYNC_MASTER # 设置 Broker 的角色。`ASYNC_MASTER` 表示该 Broker 为异步主节点,所有写操作都会异步地提交到磁盘。
# 如果是 `SYNC_MASTER`,则表示同步主节点,会等待所有数据写入磁盘后才返回成功;如果是 `SLAVE`,表示从节点。
flushDiskType=ASYNC_FLUSH # 设置刷盘的方式。`ASYNC_FLUSH` 表示异步刷盘,写数据时不等待磁盘刷写完成后再返回,性能更好,但可能存在数据丢失风险。
# 另外一个选项是 `SYNC_FLUSH`,表示同步刷盘,数据写入时会等待磁盘刷写完成,保证数据可靠性,但性能较差。
# 网络配置
listenPort=10911 # 设置 Broker 监听的端口号,客户端和其他 Broker 会通过此端口与该 Broker 进行通信。
brokerIP1=100.72.47.219 # 设置 Broker 对外暴露的 IP 地址。客户端和其他 Broker 会通过这个 IP 地址来访问此 Broker。
# 这个地址一般设置为宿主机的 IP 地址或者外部可访问的地址。确保在多个 Broker 之间互相访问时能够正常连接。
namesrvAddr=100.72.47.219:9876 # 设置 NameServer 的地址。NameServer 是 RocketMQ 的注册中心,Broker 会注册到 NameServer 上,客户端通过它来找到 Broker。
# `100.72.47.219:9876` 表示 NameServer 的 IP 地址和端口号。这里的 IP 地址应为外部可访问的地址,确保其他 Broker 和客户端能够访问到 NameServer。
5.启动
docker-compose up -d
6.进入控制台
http://100.72.47.219:9080/#/ 这里我已经部署使用过了所以里面有数据,第一次是空的
二:spring boot集成
1. 引入依赖
<!-- 根据需要选择最新版本,从中央仓库可以查看https://central.sonatype.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
2.配置application.yml参数
# 注意:rocketmq必须在根节点也就是rocketmq.name-server=*****
rocketmq:
name-server: 100.72.47.219:9876 # RocketMQ NameServer 地址
producer:
group: other-producer-group # 生产者组
send-message-timeout: 10000
consumer:
group: other-consumer-group # 消费者组
send-message-timeout: 10000
3. 消费者service类
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
* messageModel=MessageModel.CLUSTERING
* 监听模式,有消息就会消费
*/
@Service
@RocketMQMessageListener(topic = "test2-topic", consumerGroup = "other-consumer-group", messageModel = MessageModel.CLUSTERING)
public class RocketMQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.printf("收到消息: %s\n", s);
}
}
4. 生产者service类
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
@Service
public class RocketMQProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
private final String topic = "test2-topic";
// 1.同步发送消息
// 同步发送是指发送方发送一条消息后,会等待服务器返回确认信息后再进行后续操作。这种方式适用于需要可靠性保证的场景。
public void createAndSend(String message){
rocketMQTemplate.convertAndSend(topic, message);
System.out.printf("同步发送结果: %s\n", message);
}
// 1.同步发送消息
// 同步发送是指发送方发送一条消息后,会等待服务器返回确认信息后再进行后续操作。这种方式适用于需要可靠性保证的场景。
public void sendSyncMessage(String message){
SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(message).build());
System.out.println(sendResult.getMsgId());
System.out.printf("同步发送结果: %s\n", message);
}
// 2.异步发送消息
// 异步发送是指发送方发送消息后,不等待服务器返回确认信息,而是通过回调接口处理返回结果。这种方式适用于对响应时间要求较高的场景。
public void sendAsyncMessage(String message){
rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(message).build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("异步发送成功: %s\n", sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.printf("异步发送失败: %s\n", throwable.getMessage());
}
});
}
// 3.单向发送消息
// 单向发送是指发送方只负责发送消息,不关心服务器的响应。该方式适用于对可靠性要求不高的场景,如日志收集。
public void sendOneWayMessage(String message){
rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(message).build());
System.out.println("单向消息发送成功");
}
}
5.测试controller类
@RequestMapping("api")
@RestController
public class RocketController {
@Autowired
private RocketMQProducer rocketMQProducer;
@GetMapping("/createAndSend")
public String createAndSend(@RequestParam String message) {
rocketMQProducer.createAndSend(message);
return "同步消息发送成功";
}
@GetMapping("/sendSync")
public String sendSync(@RequestParam String message) {
rocketMQProducer.sendSyncMessage(message);
return "同步消息发送成功";
}
@GetMapping("/sendAsync")
public String sendAsync(@RequestParam String message) {
rocketMQProducer.sendAsyncMessage(message);
return "异步消息发送中";
}
@GetMapping("/sendOneWay")
public String sendOneWay(@RequestParam String message) {
rocketMQProducer.sendOneWayMessage(message);
return "单向消息发送成功";
}
}
6.postman测试