Published on

Use Rocketmq with Python In Docker

Authors
  • avatar
    Name
    Lif
    Twitter

Setup

Setup rocketmq by docker:

Create docker network

docker network create rocketmq

Start namesrv

docker run -d --name rocketmq-namesrv --network rocketmq -p 9876:9876 -v /var/local/docker/mount/rocketmq/namesrv/logs:/root/logs -v /var/local/docker/mount/rocketmq/namesrv/store:/root/store apache/rocketmq sh mqnamesrv

# /var/local/docker/mount/rocketmq/namesrv/logs is just a path to mount for saving log\conf\data locally
# use -v for mouting

Start broker

# First, we should copy a broker.conf from docker
docker ps -a
# get your container id
docker cp <YourContainerId>:/home/rocketmq/rocketmq-5.1.1/conf /var/local/docker/mount/rocketmq/broker/broker-a/

Edit your conf.

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUS

# Listen IP
brokerIP1 = localhost
brokerIP2 = 127.0.0.1

# Listen Port
listenPort=10911

# NameServer Address
namesrvAddr=172.18.0.2:9876

# Config if you dont want to create topic manualy
autoCreateTopicEnable = True

Note: brokerIP and namesrvAddr may be different! In this article, namesrvAddr is the docker network address.

# You can get docker network address by 
docker network inspect
docker run -d --name rocketmq-broker-a --network rocketmq -p 10909:10909 -p 10911:10911 -v /var/local/docker/mount/rocketmq/broker/broker-a/logs:/root/logs -v /var/local/docker/mount/rocketmq/broker/broker-a/store:/root/store -v /var/local/docker/mount/rocketmq/broker/broker-a/conf:/home/rocketmq/rocketmq-5.1.1/conf apache/rocketmq sh mqbroker -c /home/rocketmq/rocketmq-5.1.1/conf/broker.conf

# start broker

Create Topic

# You can change localhost:9876 to your custom host:port
# If broker.conf sets autoCreateTopicEnable = True
# we can skip this step
docker exec -it <BrokerContainerId> bash

# ./mqadmin updateTopic -n 172.18.0.2:9876  -b localhost:10911 -t TestTopic
./mqadmin updateTopic -n localhost:9876  -b localhost:10911  -t <YourTopicName>

List Topic

./mqadmin topicList -n localhost:9876

Python Usage

Install rocketmq-client-app

wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.1.0/rocketmq-client-cpp-2.1.0.amd64.deb
--2023-06-16 10:03:49--  https://github.com/apache/rocketmq-client-cpp/releases/download/2.1.0/rocketmq-client-cpp-2.1.0.amd64.deb
dpkg -i rocketmq-client-cpp-2.1.0.amd64.deb
# ubuntu use dkpg to manage package

Install python lib

pip install rocketmq-client-python

# There is another python lib called rocketmq which is already deprecated

Producer

from rocketmq.client import Producer, Message

producer = Producer('PID-XXX')
producer.set_name_server_address('localhost:9876')
# producer.set_name_server_address('localhost:9876') # seems useful on mac
producer.start()

msg = Message('YOUR-TOPIC')
msg.set_keys('XXX')
msg.set_tags('XXX')
msg.set_body('XXXX')
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()

PushConsumer

import time

from rocketmq.client import PushConsumer, ConsumeStatus


def callback(msg):
    print(msg.id, msg.body)
    return ConsumeStatus.CONSUME_SUCCESS


consumer = PushConsumer('CID_XXX')
consumer.set_name_server_address('localhost:9876')
consumer.subscribe('TestTopic', callback)
consumer.start()

while True:
    time.sleep(3600)

consumer.shutdown()