canal搭建

https://xie.infoq.cn/article/0f22f69c6a25512293e4bd789

涉及组件

  1. mysql (需要新增给canal组件权限的用户)
  2. canal-admin web管理段
  3. canal
  4. rocketmq

备注说明:

  1. canal需要依赖java,所以服务器需要安装有jdk环境

账号信息

正式环境地址:
canal-admin: 18089 admin 123456
canal:
rocketmq :9876
rocketmq-admin:17890

mysql-canal账号: canal canal@Root9700

mysql

开启bing log日志

需要先开启mysql的 binlog 写入功能,配置 binlog-format 为 ROW 模式。一般都是/etc/my.cnf,添加下列配置:

log-bin=mysql-bin   # 开启 binlog
binlog-format=ROW   # 选择 ROW 模式
server_id=1        # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

重启mysql,用以下命令检查一下binlog是否正确启动:service mysqld restart

mysql> show variables like 'log_bin%';
+---------------------------------+----------------------------------+
| Variable_name                   | Value                            |
+---------------------------------+----------------------------------+
| log_bin                         | ON                               |
| log_bin_basename                | /data/mysql/data/mysql-bin       |
| log_bin_index                   | /data/mysql/data/mysql-bin.index |
| log_bin_trust_function_creators | OFF                              |
| log_bin_use_v1_row_events       | OFF                              |
+---------------------------------+----------------------------------+
5 rows in set (0.00 sec)
mysql> show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)

新增用户

创建一个mysql用户canal 并且赋远程链接权限权限。

canroot用户名称 canal@123Root密码

CREATE USER canalroot IDENTIFIED BY 'canal@123Root';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canalroot'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

canal-admin搭建步骤

1、下载对应的安装包

​ 目前使用的是canal.admin-1.1.5.tar.gz

安装步骤:

  1. 解压 tar -zxvf canal.admin-1.1.5.tar.gz

  2. 修改/conf/application.yml内容

    #Canal Admin Web 界面端口
    server:
      port: 8089
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
    
    #元数据库连接信息
    spring.datasource:
      address: 11.8.36.104:3306
      database: canal_manager
      username: root
      password: root
      driver-class-name: com.mysql.jdbc.Driver
      url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
      hikari:
        maximum-pool-size: 30
        minimum-idle: 1
    
    #Canal Server 加入 Canal Admin 使用的密码
    canal:
      adminUser: admin
      adminPasswd: admin
    
  3. 运行canal提供的数据库

    sql: /conf/canal_manager.sql脚本

  1. 启动canal-admin

    ./bin/startup.sh

    如果无法启动并且log下面没有日志,需要看下bin目录下 可能会内存溢出信息

  2. 访问 Canal Admin 管理界面,默认用户名 admin,密码 123456

canal 搭建

  1. 解压canal.deployer-1.1.5.tar.gz

    1. tar -xzvf canal.deployer-1.1.5.tar.gz -C canal-server
      
  2. 修改/conf/canal_local.properties

#Canal Server 地址
canal.register.ip = 11.8.36.104

#Canal Admin 连接信息
canal.admin.manager = 11.8.36.104:8089
canal.admin.port = 11110
canal.admin.user = admin
#mysql5 类型 MD5 加密结果 -- admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

#自动注册
canal.admin.register.auto = true
#集群名
canal.admin.register.cluster = 
  1. 启动 local 参数表示使用 canal_local.properties 配置启动 Canal Server。
    sh /bin/startup.sh local

  2. 在canal admin ui 中配置Instance管理
    新建 Instance
    选择Instance 管理-> 新建Instance
    填写 Instance名称:
    选择 选择所属主机集群
    选择 载入模板
    修改默认信息

#mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 192.168.1.101:5506
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal@Root9700
#改成自己的数据库信息(需要监听的数据库)
canal.instance.defaultDatabaseName = lcam_sc
canal.instance.connectionCharset = UTF-8
#table regex 需要过滤的表 这里数据库的中所有表
canal.instance.filter.regex = lcam_sc.runeventlist2\\\d{6}

# MQ 配置 日志数据会发送到cms_article这个topic上
canal.mq.topic=cxhlz-canal
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#单分区处理消息
canal.mq.partition=0
  1. 修改server配置 点击操作-》配置

    1. 单机版就不用配置zookeeper
    2. 配置推送mq类型为roketmq
    #################################################
    #########         common argument        #############
    #################################################
    # tcp bind ip
    canal.ip =
    # register ip to zookeeper
    canal.register.ip = 192.168.1.101
    canal.port = 11111
    canal.metrics.pull.port = 11112
    # canal instance user/passwd
    # canal.user = canal
    # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
    
    # canal admin config 修改
    canal.admin.manager = 192.168.1.101:18089
    canal.admin.port = 11110
    canal.admin.user = admin
    canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
    
    # admin auto register
    #canal.admin.register.auto = true
    #canal.admin.register.cluster =
    #canal.admin.register.name =
    
    canal.zkServers =
    # flush data to zk
    canal.zookeeper.flush.period = 1000
    canal.withoutNetty = false
    # tcp, kafka, rocketMQ, rabbitMQ 修改
    canal.serverMode = rocketMQ
    # flush meta cursor/parse position to file
    canal.file.data.dir = ${canal.conf.dir}
    canal.file.flush.period = 1000
    ## memory store RingBuffer size, should be Math.pow(2,n)
    canal.instance.memory.buffer.size = 16384
    ## memory store RingBuffer used memory unit size , default 1kb
    canal.instance.memory.buffer.memunit = 1024 
    ## meory store gets mode used MEMSIZE or ITEMSIZE
    canal.instance.memory.batch.mode = MEMSIZE
    canal.instance.memory.rawEntry = true
    
    ## detecing config
    canal.instance.detecting.enable = false
    #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
    canal.instance.detecting.sql = select 1
    canal.instance.detecting.interval.time = 3
    canal.instance.detecting.retry.threshold = 3
    canal.instance.detecting.heartbeatHaEnable = false
    
    # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
    canal.instance.transaction.size =  1024
    # mysql fallback connected to new master should fallback times
    canal.instance.fallbackIntervalInSeconds = 60
    
    # network config
    canal.instance.network.receiveBufferSize = 16384
    canal.instance.network.sendBufferSize = 16384
    canal.instance.network.soTimeout = 30
    
    # binlog filter config
    canal.instance.filter.druid.ddl = true
    canal.instance.filter.query.dcl = false
    canal.instance.filter.query.dml = false
    canal.instance.filter.query.ddl = false
    canal.instance.filter.table.error = false
    canal.instance.filter.rows = false
    canal.instance.filter.transaction.entry = false
    canal.instance.filter.dml.insert = false
    canal.instance.filter.dml.update = false
    canal.instance.filter.dml.delete = false
    
    # binlog format/image check
    canal.instance.binlog.format = ROW,STATEMENT,MIXED 
    canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
    
    # binlog ddl isolation
    canal.instance.get.ddl.isolation = false
    
    # parallel parser config
    canal.instance.parser.parallel = true
    ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
    #canal.instance.parser.parallelThreadSize = 16
    ## disruptor ringbuffer size, must be power of 2
    canal.instance.parser.parallelBufferSize = 256
    
    # table meta tsdb info
    canal.instance.tsdb.enable = true
    canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
    canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
    canal.instance.tsdb.dbUsername = canal
    canal.instance.tsdb.dbPassword = canal
    # dump snapshot interval, default 24 hour
    canal.instance.tsdb.snapshot.interval = 24
    # purge snapshot expire , default 360 hour(15 days)
    canal.instance.tsdb.snapshot.expire = 360
    
    #################################################
    #########         destinations        #############
    #################################################
    canal.destinations = 
    # conf root dir
    canal.conf.dir = ../conf
    # auto scan instance dir add/remove and start/stop instance
    canal.auto.scan = true
    canal.auto.scan.interval = 5
    # set this value to 'true' means that when binlog pos not found, skip to latest.
    # WARN: pls keep 'false' in production env, or if you know what you want.
    canal.auto.reset.latest.pos.mode = false
    
    canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
    #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
    
    canal.instance.global.mode = manager
    canal.instance.global.lazy = false
    canal.instance.global.manager.address = ${canal.admin.manager}
    #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
    canal.instance.global.spring.xml = classpath:spring/file-instance.xml
    #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    
    ##################################################
    #########           MQ Properties      #############
    ##################################################
    # aliyun ak/sk , support rds/mq
    canal.aliyun.accessKey =
    canal.aliyun.secretKey =
    canal.aliyun.uid=
    
    canal.mq.flatMessage = true
    canal.mq.canalBatchSize = 50
    canal.mq.canalGetTimeout = 100
    # Set this value to "cloud", if you want open message trace feature in aliyun.
    canal.mq.accessChannel = local
    
    canal.mq.database.hash = true
    canal.mq.send.thread.size = 30
    canal.mq.build.thread.size = 8
    
    ##################################################
    #########              Kafka              #############
    ##################################################
    kafka.bootstrap.servers = 127.0.0.1:6667
    kafka.acks = all
    kafka.compression.type = none
    kafka.batch.size = 16384
    kafka.linger.ms = 1
    kafka.max.request.size = 1048576
    kafka.buffer.memory = 33554432
    kafka.max.in.flight.requests.per.connection = 1
    kafka.retries = 0
    
    kafka.kerberos.enable = false
    kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
    kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
    
    ##################################################
    #########             RocketMQ         #############
    ##################################################修改
    rocketmq.producer.group = test
    rocketmq.enable.message.trace = false
    rocketmq.customized.trace.topic =
    rocketmq.namespace =
    rocketmq.namesrv.addr = 192.168.1.101:9876
    rocketmq.retry.times.when.send.failed = 0
    rocketmq.vip.channel.enabled = false
    rocketmq.tag = 
    
    ##################################################
    #########             RabbitMQ         #############
    ##################################################
    rabbitmq.host =
    rabbitmq.virtual.host =
    rabbitmq.exchange =
    rabbitmq.username =
    rabbitmq.password =
    rabbitmq.deliveryMode =

server 和Instance的区别

server代表一个canal运行实例,对应于一个jvm
instance对应于一个数据队列

canal 推送数据格式

data:最新的数据,为JSON数组,如果是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据,如果是删除,则表示被删除的数据
database:数据库名称
es:事件时间,13位的时间戳
id:事件操作的序列号,1,2,3…
isDdl:是否是DDL操作
mysqlType:字段类型
old:旧数据
pkNames:主键名称
sql:SQL语句
sqlType:是经过canal转换处理的,比如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal
table:表名
ts:日志时间
type:操作类型,比如DELETE,UPDATE,INSERT

rocket mq

rocket mq安装完成后可以通过rocker提供的admin管理web界面查看消息修改mysql数据是否会推送过来

rocketmq-console-ng-1.0.1.jar

rocket mq控制台安装

nohup java -jar rocketmq-console-ng-1.0.1.jar --server.port=17890 --rocketmq.config.namesrvAddr=192.168.1.101:9876 > runtime.log 2>&1 &
自启动脚本
添加shell脚本

vim rocket-admin.sh

#!/bin/bash
# chkconfig: 2345 85 15
# description:auto_run

# 工作目录修改成自己的地址
# 注意!!! 并且该目录中只能存在一个jar包
APP_HOME=/users/ems/websophic/rocket-admin/
JAR_HOME_TOW="`cd ${APP_HOME} && find -name '*.jar' `"
APP_NAME=${JAR_HOME_TOW:2}
namesrvAddr=192.168.1.101:9876
port=17890
cd $APP_HOME

#检查程序是否在运行
is_exist(){
  pid=`ps -ef|grep $APP_NAME|grep -v grep|awk '{print $2}' `
  #如果不存在返回1,存在返回0     
  if [ -z "${pid}" ]; then
   return 1
  else
    return 0
  fi
}

#启动方法
start(){
  is_exist
  if [ $? -eq "0" ]; then
    echo "================================================="
    echo "warn: $APP_NAME is already running. (pid=$pid)"
    echo "================================================="
  else
    nohup java -jar $APP_NAME --server.port=${port} --rocketmq.config.namesrvAddr=${namesrvAddr} > runtime.log 2>&1 &
    echo "${APP_NAME}: ${port} start success"
  fi
}

#停止方法
stop(){
  is_exist
  if [ $? -eq "0" ]; then
    kill -9 $pid
    echo "${APP_NAME} :${port}stop success"
  else
    echo "================================================="
    echo "warn: $APP_NAME:${port} is not running"
    echo "================================================="
  fi  
}

#输出运行状态
status(){
  is_exist
  if [ $? -eq "0" ]; then
    echo "================================================="
    echo "warn: $APP_NAME:${port} is already running. (pid=$pid)"
    echo "================================================="
  else
    echo "================================================="
    echo "warn: $APP_NAME:${port} is not running"
    echo "================================================="
  fi
}

#根据输入参数,选择执行对应方法,不输入则执行使用说明
case "$1" in
  "start")
    start
    ;;
  "stop")
    stop
    ;;
  "status")
    status
    ;;
  "restart")
    stop
    echo "restart the application ..."
    start
    ;;
  *)
    echo "================================================="
    echo "Tips: start|stop|restart|status"
    echo "================================================="
    ;;
esac
添加执行权限
chmod +x  rocket-admin.sh
移动到/etc/init.d目录下
mv rocket-admin.sh /etc/init.d/
cd /etc/init.d/
【chkconfig 开机自启 方式一】
chkconfig可以更新(启动或停止)和查询系统服务(service)运行级信息。
更简单一点,chkconfig是一个用于维护/etc/rc[0-6].d目录的命令行工具。
添加系统服务
# 注册系统服务
chkconfig --add rocket-admin.sh
# 删除系统服务
chkconfig --del rocket-admin.sh
# 设置开机启动
chkconfig rocket-admin.sh on
# 查看系统服务列表
chkconfig --list
# 查看2-5都是开,出现则代表加入成功了
常用命令
#启动服务
service rocket-admin.sh start
#停止服务
service srocket-admin.sh stop
#重启服务
service rocket-admin.sh restart
#服务状态
service rocket-admin.sh status

rocketmq 安装(单机)

查看集群信息

./bin/mqadmin clusterList -n 192.168.1.101:9876

https://blog.csdn.net/wu4566285/article/details/119297536

//解压
unzip rocketmq-all-4.4.0-bin-release.zip


# 编辑runbroker.sh和runserver.sh修改默认JVM大小
vim runbroker.sh
vim runserver.sh

参考设置:改为1g
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

启动nameserver
nohup sh ./bin/mqnamesrv >>namesrv.log 2>&1 &

启动broker

//-c conf/broker.conf autoCreateTopicEnable=true 参数需要带上,不然topic需要手动创建
nohup sh     ./bin/mqbroker -n 192.168.1.101:9876 >>broker.log 2>&1 & 

主从集群安装

https://www.cnblogs.com/wtb-26/p/14151036.html

rocketmq默认给出了三种建议配置模式

  1. 2m-2s-async(主从异步)
  2. 2m-2s-sync(主从同步)
  3. 2m-noslave(仅master)

服务器ABC的broker配置注意

1.注意每个主broker和它的从broker的brokerName需要一致。

2.注意主broker的brokerId=0,从broker的brokerId=1,0表示Master,大于0表示 Slave,不然broker启动会因冲突而失败

3.无论主从,每个broker,都应该注册到所有的nameServer中,如下

namesrvAddr=10.28.82.134:9876;10.28.82.135:9876;10.28.82.137:9876

4.注意主broker的brokerRole=ASYNC_MASTER,从broker的brokerRole=SLAVE

5.brokerIP1按照所在服务器ip来配置

6.同一个服务器的两个broker监听端口不能一样,但是不同服务器的监听端口可以一样

1、修改配置

文件存储配置

mkdir store

broker配置

vim conf/2m-2s-async/broker-a.properties

# 集群名,不同broker节点集群名是一样的
brokerClusterName=cxhlz-cluster
# broker名字,不同broker节点brokerName是唯一的,他的从节点也要和这个名字一样
brokerName=broker-a
# 0表示Master,大于0表示 Slave
brokerId=0
#nameServer地址,多个用英文分号分割
namesrvAddr=198.120.100.100:9876;198.120.100.101:9876;198.120.100.104:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# 当前节点角色,ASYNC_MASTER=异步复制Master模式,SYNC_MASTER=同步双写Master模式 **注意 主broker的brokerRole=ASYNC_MASTER,从broker的brokerRole=SLAVE**
brokerRole=ASYNC_MASTER
# 刷盘模式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
flushDiskType=ASYNC_FLUSH
# 数据存储路径
storePathRootDir=/users/ems/websophic/rocketmq-cluster/store/3astore
# commitLog 存储路径
storePathCommitLog=/users/ems/websophic/rocketmq-cluster/store/3astore/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/users/ems/websophic/rocketmq-cluster/store/3astore/consumequeue
# 消息索引存储路径
storePathIndex=/users/ems/websophic/rocketmq-cluster/store/3astore/index
# checkpoint 文件存储路径
storeCheckpoint=/users/ems/websophic/rocketmq-cluster/store/3astore/checkpoint
# abort 文件存储路径
abortFile=/users/ems/websophic/rocketmq-cluster/store/3astore/abort
# 是否允许 Broker 自动创建Topic,建议关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议关闭
autoCreateSubscriptionGroup=false
# 设置brokerIP,如果不设置当服务器有很多网卡,默认会读取第一个网卡的IP地址(如安装docker后),会导致客户端无法连接
brokerIP1=198.120.100.100
# broker对外服务的监听端口,默认10911
listenPort=10911
2、配置slave broker-b-s(即把服务器A作为服务器B的从机)

vim broker-b-s.properties

# 集群名,不同broker节点集群名是一样的
brokerClusterName=cxhlz-cluster
# broker名字,不同broker节点brokerName是唯一的
brokerName=broker-b
# 0表示Master,大于0表示 Slave
brokerId=1
#nameServer地址,多个用英文分号分割
namesrvAddr=198.120.100.100:9876;198.120.100.101:9876;198.120.100.104:9876
# 删除文件时间点,默认凌晨 4点
deleteWhen=04
# 文件保留时间,默认 48 小时
fileReservedTime=48
# 当前节点角色,ASYNC_MASTER=异步复制Master模式,SYNC_MASTER=同步双写Master模式
brokerRole=SLAVE
# 刷盘模式,ASYNC_FLUSH=异步刷盘,SYNC_FLUSH=同步刷盘
flushDiskType=ASYNC_FLUSH
# 数据存储路径
storePathRootDir=/users/ems/websophic/rocketmq-cluster/store/3bstore
# commitLog 存储路径
storePathCommitLog=/users/ems/websophic/rocketmq-cluster/store/3bstore/commitlog
# 消费队列存储路径存储路径
storePathConsumeQueue=/users/ems/websophic/rocketmq-cluster/store/3bstore/consumequeue
# 消息索引存储路径
storePathIndex=/users/ems/websophic/rocketmq-cluster/store/3bstore/index
# checkpoint 文件存储路径
storeCheckpoint=/users/ems/websophic/rocketmq-cluster/store/3bstore/checkpoint
# abort 文件存储路径
abortFile=/users/ems/websophic/rocketmq-cluster/store/3bstore/abort
# 是否允许 Broker 自动创建Topic,建议关闭
autoCreateTopicEnable=false
# 是否允许 Broker 自动创建订阅组,建议关闭
autoCreateSubscriptionGroup=false
# 设置brokerIP,如果不设置当服务器有很多网卡,默认会读取第一个网卡的IP地址(如安装docker后),会导致客户端无法连接
brokerIP1=198.120.100.100
# broker对外服务的监听端口,默认10911
listenPort=10811
内存配置

修改默认broker和nameserver的默认内存配置(此处仅供参考,按实际情况自行修改)

# 修改broker内存配置,将默认4G最低内存改为1g
vim /bin/runbroker.sh

#JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

# 修改runserver.sh,将默认2G最低内存改为512
vim /wls/rocketmq/rocketmq-4.7.1/bin/runserver.sh

#JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

启动集群

启动namesrv(每台都要启动)


# 启动NameServer,NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer
nohup sh bin/mqnamesrv 2>&1 &
# 验证Name Server 是否启动成功
tail -100f ~/logs/rocketmqlogs/namesrv.log
看到The Name Server boot success...即启动成功

启动broker(每台都要启动,此处以服务器A为例)


# 启动Broker集群
# 在机器A,启动Master broker-a,-c 参数指定broker配置文件地址
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-a.properties 2>&1 &
# 在机器A,启动slave broker-b-s,-c 参数指定broker配置文件地址
nohup sh bin/mqbroker -c conf/2m-2s-async/broker-b-s.properties 2>&1 &
# 验证Name Server 是否启动成功
tail -100f ~/logs/rocketmqlogs/broker.log

注意:需要手动创建主题和消费者组

查看集群

# 查看集群状态
sh mqadmin clusterlist -n 10.28.82.137:9876
或sh mqadmin clusterlist -n 10.28.82.135:9876
或sh mqadmin clusterlist -n 10.28.82.134:9876

# 关闭服务的命令
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

查看rocketmq进程:

ps -ef |grep rocketmq

自启动脚本

https://blog.csdn.net/qq_41312197/article/details/109217792

vi /etc/init.d/rocketmq

#!/bin/sh
#
# rocketmq - this script starts and stops the rocketmq daemon
#
# chkconfig: - 85 15

#JAVA_HOME和ROCKETMQ_HOME 要根据实际地址进行替换
#export JAVA_HOME=/opt/edas/jdk/java
#export PATH=$JAVA_HOME/bin:$PATH

ROCKETMQ_HOME=/users/ems/websophic/rocketmq-4.9.2
ROCKETMQ_BIN=${ROCKETMQ_HOME}/bin
ADDR=192.168.1.101:9876
LOG_DIR=${ROCKETMQ_HOME}/logs
NAMESERVER_LOG=${LOG_DIR}/namesrv.log
BROKER_LOG=${LOG_DIR}/broker.log

start() {
if [ ! -d ${LOG_DIR} ];then
mkdir ${LOG_DIR}
fi
cd ${ROCKETMQ_HOME}
nohup sh bin/mqnamesrv >>namesrv.log 2>&1 &
echo -n "The Name Server boot success..."
nohup sh bin/mqbroker -n ${ADDR} >>broker.log 2>&1 & 
echo -n "The broker[%s, ${ADDR}] boot success..."
}
stop() {
cd ${ROCKETMQ_HOME}
sh bin/mqshutdown broker
sleep 1
sh bin/mqshutdown namesrv
}
restart() {
stop
sleep 5
start
}


case "$1" in
start)
start
;;
stop)
stop
;;
restart)
restart
;;
*)
echo $"Usage: $0 {start|stop|restart}"
exit 2
esac
  1. 将rocketmq服务添加为开机启动服务

    chmod a+x /etc/init.d/rocketmq
    chkconfig --add rocketmq
    chkconfig --list
    # 查看2-5都是开,出现则代表加入成功了
  2. 启动:service rocketmq start
    关闭:service rocketmq stop
    重启:service rocketmq restart

xxl-job

自启动脚本

添加shell脚本

vim xxl-job-admin.sh

#!/bin/bash
# chkconfig: 2345 85 15
# description:auto_run

# 工作目录修改成自己的地址
# 注意!!! 并且该目录中只能存在一个jar包
APP_HOME=/users/ems/websophic/xxl-job
JAR_HOME_TOW="`cd ${APP_HOME} && find -name '*.jar' `"
APP_NAME=${JAR_HOME_TOW:2}
port=8099
cd $APP_HOME

#检查程序是否在运行
is_exist(){
  pid=`ps -ef|grep $APP_NAME|grep -v grep|awk '{print $2}' `
  #如果不存在返回1,存在返回0     
  if [ -z "${pid}" ]; then
   return 1
  else
    return 0
  fi
}

#启动方法
start(){
  is_exist
  if [ $? -eq "0" ]; then
    echo "================================================="
    echo "warn: $APP_NAME is already running. (pid=$pid)"
    echo "================================================="
  else
    nohup java -jar $APP_NAME --server.port=${port} 2>&1 &
    echo "${APP_NAME}: ${port} start success"
  fi
}

#停止方法
stop(){
  is_exist
  if [ $? -eq "0" ]; then
    kill -9 $pid
    echo "${APP_NAME} :${port}stop success"
  else
    echo "================================================="
    echo "warn: $APP_NAME:${port} is not running"
    echo "================================================="
  fi  
}

#输出运行状态
status(){
  is_exist
  if [ $? -eq "0" ]; then
    echo "================================================="
    echo "warn: $APP_NAME:${port} is already running. (pid=$pid)"
    echo "================================================="
  else
    echo "================================================="
    echo "warn: $APP_NAME:${port} is not running"
    echo "================================================="
  fi
}

#根据输入参数,选择执行对应方法,不输入则执行使用说明
case "$1" in
  "start")
    start
    ;;
  "stop")
    stop
    ;;
  "status")
    status
    ;;
  "restart")
    stop
    echo "restart the application ..."
    start
    ;;
  *)
    echo "================================================="
    echo "Tips: start|stop|restart|status"
    echo "================================================="
    ;;
esac

添加执行权限

chmod +x  xxl-job-admin.sh

移动到/etc/init.d目录下

mv xxl-job-admin.sh /etc/init.d/
cd /etc/init.d/

【chkconfig 开机自启 方式一】

chkconfig可以更新(启动或停止)和查询系统服务(service)运行级信息。
更简单一点,chkconfig是一个用于维护/etc/rc[0-6].d目录的命令行工具。

添加系统服务

# 注册系统服务
chkconfig --add xxl-job-admin.sh
# 删除系统服务
chkconfig --del xxl-job-admin.sh
# 设置开机启动
chkconfig xxl-job-admin.sh on
# 查看系统服务列表
chkconfig --list
# 查看2-5都是开,出现则代表加入成功了

常用命令

#启动服务
service xxl-job-admin.sh start
#停止服务
service sxxl-job-admin.sh stop
#重启服务
service xxl-job-admin.sh restart
#服务状态
service xxl-job-admin.sh status