RocketMQ 5.X 初使用 & 踩坑
背景
RacketMQ 5.X 的发布,向云原生迈出了一大步。从官方的文档中,我们可以看到其中最大的一块变化是新增了 Proxy 模块,新旧版本架构对比如下:
RocketMQ 4.X 架构
RocketMQ 5.X 架构
在 RocketMQ4.X 的时候,深入使用过 RocketMQ 的人很多都可能遇到一个问题,RocketMQ 在复杂网络下是有缺陷的,比如说网上就有很多人问 RocketMQ 如何同时对内网和外网提供服务。得到的答案可能是让你修改brokerIP1、brokerIP2,brokerIP1 指向公网IP,brokerIP2 指向内网IP。但是这样并不能解决问题,因为 NameServer 始终会向客户端返回 brokerIP1 的地址,并不是真正解决了内外网的问题。
而 Proxy 模块的加入,客户端不再需要知道 Broker 的地址了,而是通过 Proxy 做一层中转,因此可以彻底解决复杂网络接入的问题。Proxy 使用全新的 gRPC 协议接受客户端的请求,同时也兼容 4.X 的 Remoting 协议,使得 RocketMQ 可以灵活地接入不同版本的客户端请求。
默认情况下,Proxy 的 8080 端口提供 Remoting 协议的接入,8081 端口提供 gRPC 协议的接入。Proxy 有一个参数 remotingAccessAddr
控制向 Remoting 协议客户端返回的访问地址。 你甚至可以这样做:
- 搭建一套 Proxy,设置
remotingAccessAddr
为内网地址,对内网提供服务 - 搭建另一套 Proxy,设置
remotingAccessAddr
为外网地址,对外网提供服务 - 还可以不用 Proxy,客户端通过 Name Server 拿到 Broker 的地址,和 4.X 的架构基本一样
在编写这篇文章的时刻,RocketMQ 刚刚发布了 5.1.0 版本,但是经过我验证测试,proxy 还没做好旧版本协议的兼容,让我们一起期待下个版本的发布吧。
部署
RocketMQ 5.X 已经拥抱云原生,在 Kubernetes 上部署起来已经不再是难事。在我写这篇文章的时候,官方还没发布 5.X 版本的容器镜像,因此我是自己制作的镜像。
yaml
注意:此yaml部署的是单实例,且没有做持久存储,仅供测试用。
yaml 是使用 rocketmq-docker/rocketmq-k8s-helm at master · apache/rocketmq-docker · GitHub 生成的,并做了一点修改。其中
remotingAccessAddr
参数自行修改。容器镜像是使用 rocketmq-docker/image-build at master · apache/rocketmq-docker · GitHub 生成的,对脚本做了一点修改。
# Source: rocketmq/templates/broker/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: broker-conf
labels:
app.kubernetes.io/name: broker
data:
rocketmq-broker-0: |
brokerClusterName=DefaultCluster
brokerName=rocketmq-broker-0
enableNameServerAddressResolve=true
# common configs
traceOn=true
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
enableIncrementalTopicCreation=true
generateConfigForScaleOutEnable=false
enableNotifyAfterPopOrderLockRelease=true
autoMessageVersionOnTopicLen=true
# pop config
enablePopBufferMerge=true
enableConsumePopRetryTopic=true
enableConsumePullRetryTopic=true
enableSkipLongWaitAck=true
# Store config
flushDiskType=SYNC_FLUSH
# Enable SQL92
enablePropertyFilter=true
# Transaction config
transactionCheckMaxTimeInMs=14400000
transactionCheckInterval=60000
# Delay config
timerWheelEnable=true
timerMaxDelaySec=86400
waitTimeMillsInSendQueue=900
maxMessageSize=5242880
# stream
litePullMessageEnable=true
---
# Source: rocketmq/templates/nameserver/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: nameserver-conf
labels:
app.kubernetes.io/name: nameserver
data:
namesrv.p: |
listenPort=9876
defaultThreadPoolNums=4
deleteTopicWithBrokerRegistration=true
---
# Source: rocketmq/templates/proxy/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: proxy-conf
data:
rmq-proxy.json: |
{
"enableFlowControl": true,
"enableFlowLimitAction": true,
"grpcClientProducerBackoffInitialMillis": 5,
"grpcClientProducerBackoffMaxMillis": 1000,
"grpcClientProducerBackoffMultiplier": 5,
"longPollingReserveTimeInMillis": 1000,
"maxMessageGroupSize": 64,
"maxMessageSize": 4194304,
"maxUserPropertySize": 16384,
"metricCollectorMode": "proxy",
"namesrvAddr": "rocketmq-nameserver:9876",
"rocketMQClusterName": "DefaultCluster",
"transactionHeartbeatBatchNum": 1,
"userPropertyMaxNum": 128,
"remotingAccessAddr": "改为你自己的IP"
}
---
# Source: rocketmq/templates/broker/service.yaml
apiVersion: v1
kind: Service
metadata:
name: rocketmq-broker
labels:
app.kubernetes.io/name: broker
spec:
ports:
- port: 10911
targetPort: broker-port
protocol: TCP
name: broker-port
selector:
app.kubernetes.io/name: broker
clusterIP: None
---
# Source: rocketmq/templates/nameserver/service.yaml
apiVersion: v1
kind: Service
metadata:
name: rocketmq-nameserver
labels:
app.kubernetes.io/name: nameserver
spec:
clusterIP: None
ports:
- port: 9876
targetPort: 9876
protocol: TCP
name: nameserver-service
selector:
app.kubernetes.io/name: nameserver
---
# Source: rocketmq/templates/proxy/service.yaml
apiVersion: v1
kind: Service
metadata:
name: rocketmq-proxy
labels:
app.kubernetes.io/name: proxy
spec:
clusterIP: None
ports:
- port: 8080
targetPort: 8080
protocol: TCP
name: remoting-internet
- port: 8081
targetPort: 8081
protocol: TCP
name: grpc
selector:
app.kubernetes.io/name: proxy
---
# Source: rocketmq/templates/nameserver/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rocketmq-nameserver
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: nameserver
component: nameserver
template:
metadata:
labels:
app.kubernetes.io/name: nameserver
component: nameserver
spec:
containers:
- name: nameserver
image: apache/rocketmq:5.1.0
imagePullPolicy: "Always"
command: [ "/bin/sh" ]
args: [ "-c", "./mqnamesrv -c /home/rocketmq/config/namesrv.p" ]
ports:
- name: nameserver
containerPort: 9876
protocol: TCP
startupProbe:
tcpSocket:
port: nameserver
periodSeconds: 5
initialDelaySeconds: 20
failureThreshold: 3
livenessProbe:
tcpSocket:
port: nameserver
initialDelaySeconds: 5
periodSeconds: 5
failureThreshold: 3
readinessProbe:
tcpSocket:
port: nameserver
initialDelaySeconds: 5
periodSeconds: 5
timeoutSeconds: 1
failureThreshold: 3
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 250m
memory: 2Gi
volumeMounts:
- mountPath: /home/rocketmq/config
name: nameserver-config
- mountPath: /home/rocketmq/logs
name: nameserver-log
volumes:
- name: nameserver-config
configMap:
name: nameserver-conf
- name: nameserver-log
emptyDir: { }
---
# Source: rocketmq/templates/proxy/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: rocketmq-proxy
spec:
replicas: 1
selector:
matchLabels:
app.kubernetes.io/name: proxy
template:
metadata:
labels:
app.kubernetes.io/name: proxy
spec:
containers:
- name: proxy
image: apache/rocketmq:5.1.0
imagePullPolicy: "Always"
command: [ "/bin/sh" ]
args: [ "-c", "./mqproxy -pc /home/rocketmq/rocketmq-proxy/configmap/rmq-proxy.json" ]
env:
- name: RMQ_PROXY_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: NAMESRV_ADDR
value: rocketmq-nameserver:9876
- name: RMQ_PROXY_CONFIG_PATH
value: /home/rocketmq/rocketmq-proxy/configmap
lifecycle:
preStop:
exec:
command:
- sh
- ./bin/mqshutdown
- proxy
ports:
- name: remote
containerPort: 8080
protocol: TCP
- name: grpc
containerPort: 8081
protocol: TCP
startupProbe:
tcpSocket:
port: grpc
initialDelaySeconds: 10
failureThreshold: 30
periodSeconds: 10
readinessProbe:
tcpSocket:
port: grpc
periodSeconds: 5
livenessProbe:
tcpSocket:
port: grpc
periodSeconds: 10
resources:
limits:
cpu: 1
memory: 2Gi
requests:
cpu: 300m
memory: 1Gi
volumeMounts:
- name: conf
mountPath: /home/rocketmq/rocketmq-proxy/configmap/
volumes:
- name: conf
configMap:
name: proxy-conf
---
# Source: rocketmq/templates/broker/statefulset.yaml
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: rocketmq-broker
spec:
replicas: 1
serviceName: rocketmq-broker
selector:
matchLabels:
app.kubernetes.io/name: broker
podManagementPolicy: Parallel
updateStrategy:
type: RollingUpdate
template:
metadata:
labels:
app.kubernetes.io/name: broker
spec:
containers:
- name: broker
image: apache/rocketmq:5.1.0
imagePullPolicy: Always
command: [ "/bin/sh" ]
args: [ "-c", "./mqbroker -c /home/rocketmq/conf/$(POD_NAME)" ]
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: NAMESRV_ADDR
value: rocketmq-nameserver:9876
ports:
- name: broker
containerPort: 10911
protocol: TCP
- name: con-nameserver
containerPort: 10909
protocol: TCP
- name: ha
containerPort: 10912
protocol: TCP
- name: proxy
containerPort: 8081
protocol: TCP
readinessProbe:
failureThreshold: 3
initialDelaySeconds: 60
periodSeconds: 15
successThreshold: 1
tcpSocket:
port: 10911
timeoutSeconds: 1
livenessProbe:
failureThreshold: 3
initialDelaySeconds: 60
periodSeconds: 15
successThreshold: 1
tcpSocket:
port: 10911
timeoutSeconds: 1
resources:
limits:
cpu: 2
memory: 4Gi
requests:
cpu: 2
memory: 4Gi
volumeMounts:
- mountPath: /home/rocketmq/conf
name: broker-config
- mountPath: /home/rocketmq/logs
name: broker-storage
subPath: home/rocketmq/rocketmq-broker
- mountPath: /root/store
name: broker-storage
subPath: store/rocketmq-broker
volumes:
- name: broker-config
configMap:
name: broker-conf
- name: broker-storage
emptyDir: { }
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: rocketmq-dashboard
spec:
selector:
matchLabels:
app: rocketmq-dashboard
template:
metadata:
labels:
app: rocketmq-dashboard
spec:
containers:
- name: dashboard
image: apacherocketmq/rocketmq-dashboard:1.0.0
imagePullPolicy: "Always"
resources:
limits:
cpu: 1
memory: 2Gi
env:
- name: JAVA_OPTS
value: -Drocketmq.namesrv.addr=rocketmq-nameserver:9876
---
apiVersion: v1
kind: Service
metadata:
labels:
app: rocketmq-dashboard
name: rocketmq-dashboard
spec:
ports:
- name: http
port: 8080
protocol: TCP
targetPort: 8080
selector:
app: rocketmq-dashboard
type: NodePort
踩坑
在部署使用的过程中,遇到了不少的问题,这里给大家分享一下,提供参考。
alpine 为基础镜像,proxy 运行不了
使用默认的 Dockerfile-alpine 制作的镜像,运行 proxy 会报错:
省略很多错误信息...
find an unexpect err.java.lang.RuntimeException: grpc tls set failed: Could not find TLS ALPN provider; no working netty-tcnative, Conscrypt, or Jetty NPN/ALPN available
我修改了 Dockerfile-alpine,用 alpine3.17.0,安装 openjdk 8.345.01-r3,运行依然报错:
#
# A fatal error has been detected by the Java Runtime Environment:
#
# SIGSEGV (0xb) at pc=0x0000000000003fd6, pid=195, tid=0x00007f5e6f99db38
#
# JRE version: OpenJDK Runtime Environment (8.0_275-b01) (build 1.8.0_275-b01)
# Java VM: OpenJDK 64-Bit Server VM (25.275-b01 mixed mode linux-amd64 compressed oops)
# Derivative: IcedTea 3.17.1
# Distribution: Custom build (Mon Feb 15 17:52:28 UTC 2021)
# Problematic frame:
# C 0x0000000000003fd6
#
# Core dump written. Default location: /home/rocketmq/rocketmq-5.0.0/bin/core or core.195
#
# An error report file with more information is saved as:
# /home/rocketmq/rocketmq-5.0.0/bin/hs_err_pid195.log
#
# If you would like to submit a bug report, please include
# instructions on how to reproduce the bug and visit:
# https://icedtea.classpath.org/bugzilla
#
Aborted (core dumped)
最后,用 centos 作为基础镜像,就正常运行了。
./build-image.sh 5.1.0 centos
RocketMQ 5.1.0 才支持 remoting 协议
我刚开始测试的时候,最新的 release 版本是 5.0.0。这个版本不支持 remoting 协议,而且官方又没有说明,这个比较坑,我看了源码才发现的。
后来等官方发布 5.1.0 后就支持 remoting 协议了。但是没有兼容 4.X 的客户端,还需要再等等。
默认制作的镜像不支持自定义启动参数,比如 -Xmx -Xms 等
官方发布的 release bin 包,启动脚本不支持自定义启动参数,比如 -Xmx -Xms 等。我在 rocketmq-docker/image-build at master · apache/rocketmq-docker · GitHub 的基础上修改了 scripts/runbroker-customize.sh
和 scripts/runserver-customize.sh
,使其根据 resources.limits.memory
的设置自动配置 -Xmx -Xms 等参数。修改后的内容如下:
runbroker-customize.sh
#!/bin/sh # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #=========================================================================================== # Java Environment Setting #=========================================================================================== error_exit () { echo "ERROR: $1 !!" exit 1 } [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!" export JAVA_HOME export JAVA="$JAVA_HOME/bin/java" export BASE_DIR=$(dirname $0)/.. export CLASSPATH=.:${BASE_DIR}/conf:${BASE_DIR}/lib/*:${CLASSPATH} limit_in_bytes=$(cat /sys/fs/cgroup/memory/memory.limit_in_bytes) # If not default limit_in_bytes in cgroup if [ "$limit_in_bytes" -ne "9223372036854771712" ]; then limit_in_megabytes=$(expr $limit_in_bytes \/ 1048576) heap_size=$(expr $limit_in_megabytes \* 75 \/ 100) # 75% LimitMem new_size=$(expr $heap_size \* 3 \/ 8) JAVA_OPT="${JAVA_OPT} -server -Xmx${heap_size}m -Xms${heap_size}m -Xmn${new_size}m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" else JAVA_OPT="${JAVA_OPT} -server -Xmx2g -Xms2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" fi choose_gc_options() { JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | head -1 | cut -d'"' -f2 | sed 's/^1\.//' | cut -d'.' -f1) if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "8" ] ; then JAVA_OPT="${JAVA_OPT} -Xmn4g -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC" else JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0" fi if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then JAVA_OPT="${JAVA_OPT} -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy" JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" else JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0" fi } choose_gc_options JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking" JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true" #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" numactl --interleave=all pwd > /dev/null 2>&1 if [ $? -eq 0 ] then if [ -z "$RMQ_NUMA_NODE" ] ; then numactl --interleave=all $JAVA ${JAVA_OPT} $@ else numactl --cpunodebind=$RMQ_NUMA_NODE --membind=$RMQ_NUMA_NODE $JAVA ${JAVA_OPT} $@ fi else $JAVA ${JAVA_OPT} $@ fi
runserver-customize.sh
#!/bin/sh # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. #=========================================================================================== # Java Environment Setting #=========================================================================================== error_exit () { echo "ERROR: $1 !!" exit 1 } [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java [ ! -e "$JAVA_HOME/bin/java" ] && error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)!" export JAVA_HOME export JAVA="$JAVA_HOME/bin/java" export BASE_DIR=$(dirname $0)/.. export CLASSPATH=.:${BASE_DIR}/conf:${BASE_DIR}/lib/*:${CLASSPATH} limit_in_bytes=$(cat /sys/fs/cgroup/memory/memory.limit_in_bytes) # If not default limit_in_bytes in cgroup if [ "$limit_in_bytes" -ne "9223372036854771712" ]; then limit_in_megabytes=$(expr $limit_in_bytes \/ 1048576) heap_size=$(expr $limit_in_megabytes \* 75 \/ 100) # 75% LimitMem new_size=$(expr $heap_size \* 3 \/ 8) JAVA_OPT="${JAVA_OPT} -server -Xmx${heap_size}m -Xms${heap_size}m -Xmn${new_size}m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" else JAVA_OPT="${JAVA_OPT} -server -Xmx2g -Xms2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" fi choose_gc_options() { # Example of JAVA_MAJOR_VERSION value : '1', '9', '10', '11', ... # '1' means releases befor Java 9 JAVA_MAJOR_VERSION=$("$JAVA" -version 2>&1 | sed -r -n 's/.* version "([0-9]*).*$/\1/p') if [ -z "$JAVA_MAJOR_VERSION" ] || [ "$JAVA_MAJOR_VERSION" -lt "9" ] ; then JAVA_OPT="${JAVA_OPT} -XX:+UseConcMarkSweepGC -XX:+UseCMSCompactAtFullCollection -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled -XX:SoftRefLRUPolicyMSPerMB=0 -XX:+CMSClassUnloadingEnabled -XX:SurvivorRatio=8 -XX:-UseParNewGC" JAVA_OPT="${JAVA_OPT} -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps" JAVA_OPT="${JAVA_OPT} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m" else JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0" fi } choose_gc_options JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages" #JAVA_OPT="${JAVA_OPT} -Xdebug -Xrunjdwp:transport=dt_socket,address=9555,server=y,suspend=n" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}" $JAVA ${JAVA_OPT} $@
大佬,rocketmq5.0 的api 中consumer 默认是 集群模式 对吧,怎么设置成广播模式?
牛