- Published on
Kafka in multiple language
- Authors
- Name
- Lif
Installation
curl -sSL https://raw.githubusercontent.com/bitnami/containers/main/bitnami/kafka/docker-compose.yml > docker-compose.yml
docker-compose up -d
Rust
这里使用的是rdkafka
,
producer.rs:
use std::time::Duration;
use clap::{App, Arg};
use log::info;
use rdkafka::config::ClientConfig;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::get_rdkafka_version;
use crate::example_utils::setup_logger;
mod example_utils;
async fn produce(brokers: &str, topic_name: &str) {
let producer: &FutureProducer = &ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");
let send_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis(); // 获取当前时间戳(毫秒)
// This loop is non blocking: all messages will be sent one after the other, without waiting
// for the results.
let futures = (0..5)
.map(|i| async move {
// The send operation on the topic returns a future, which will be
// completed once the result or failure from Kafka is received.
let delivery_status = producer
.send(
FutureRecord::to(topic_name)
.payload(&format!("{}", send_time))
.key(&format!("Key {}", i))
.headers(OwnedHeaders::new().insert(Header {
key: "header_key",
value: Some("header_value"),
})),
Duration::from_secs(0),
)
.await;
// This will be executed when the result is received.
info!("Delivery status for message {} received", i);
delivery_status
})
.collect::<Vec<_>>();
// This loop will wait until all delivery statuses have been received.
for future in futures {
info!("Future completed. Result: {:?}", future.await);
}
}
#[tokio::main]
async fn main() {
let matches = App::new("producer example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Simple command line producer")
.arg(
Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"),
)
.arg(
Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true),
)
.arg(
Arg::with_name("topic")
.short("t")
.long("topic")
.help("Destination topic")
.takes_value(true)
.required(true),
)
.get_matches();
setup_logger(true, matches.value_of("log-conf"));
let (version_n, version_s) = get_rdkafka_version();
info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
let topic = matches.value_of("topic").unwrap();
let brokers = matches.value_of("brokers").unwrap();
produce(brokers, topic).await;
}
consumer.rs
use clap::{App, Arg};
use log::{info, warn};
use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
use rdkafka::error::KafkaResult;
use rdkafka::message::{Headers, Message};
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::util::get_rdkafka_version;
use crate::example_utils::setup_logger;
mod example_utils;
// A context can be used to change the behavior of producers and consumers by adding callbacks
// that will be executed by librdkafka.
// This particular context sets up custom callbacks to log rebalancing events.
struct CustomContext;
impl ClientContext for CustomContext {}
impl ConsumerContext for CustomContext {
fn pre_rebalance(&self, rebalance: &Rebalance) {
info!("Pre rebalance {:?}", rebalance);
}
fn post_rebalance(&self, rebalance: &Rebalance) {
info!("Post rebalance {:?}", rebalance);
}
fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
info!("Committing offsets: {:?}", result);
}
}
// A type alias with your custom consumer can be created for convenience.
type LoggingConsumer = StreamConsumer<CustomContext>;
async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
let context = CustomContext;
let consumer: LoggingConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
//.set("statistics.interval.ms", "30000")
//.set("auto.offset.reset", "smallest")
.set_log_level(RDKafkaLogLevel::Debug)
.create_with_context(context)
.expect("Consumer creation failed");
consumer
.subscribe(&topics.to_vec())
.expect("Can't subscribe to specified topics");
loop {
let start = std::time::Instant::now();
match consumer.recv().await {
Err(e) => warn!("Kafka error: {}", e),
Ok(m) => {
info!("Start time is: {:?}", start);
let duration = start.elapsed();
info!("Time elapsed receiving the message is: {:?}", duration);
let payload = match m.payload_view::<str>() {
None => "",
Some(Ok(s)) => s,
Some(Err(e)) => {
warn!("Error while deserializing message payload: {:?}", e);
""
}
};
println!("pay load is {}", payload);
let payload_parts: Vec<&str> = payload.split(" with timestamp ").collect();
if payload_parts.len() == 2 {
let send_time: u128 = payload_parts[1].parse().unwrap_or(0);
let receive_time = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis(); // 获取当前时间戳(毫秒)
let elapsed_time = receive_time - send_time; // 计算耗时
info!("Received time: {} {}", receive_time, send_time);
info!("Elapsed time: {} ms", elapsed_time);
}
// let send_time: u128 = payload.parse().unwrap_or(1);
// println!("send time is {} ", send_time);
info!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
if let Some(headers) = m.headers() {
for header in headers.iter() {
info!(" Header {:#?}: {:?}", header.key, header.value);
}
}
consumer.commit_message(&m, CommitMode::Async).unwrap();
}
};
}
}
#[tokio::main]
async fn main() {
let matches = App::new("consumer example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Simple command line consumer")
.arg(
Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"),
)
.arg(
Arg::with_name("group-id")
.short("g")
.long("group-id")
.help("Consumer group id")
.takes_value(true)
.default_value("example_consumer_group_id"),
)
.arg(
Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true),
)
.arg(
Arg::with_name("topics")
.short("t")
.long("topics")
.help("Topic list")
.takes_value(true)
.multiple(true)
.required(true),
)
.get_matches();
setup_logger(true, matches.value_of("log-conf"));
let (version_n, version_s) = get_rdkafka_version();
// info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);
let topics = matches.values_of("topics").unwrap().collect::<Vec<&str>>();
let brokers = matches.value_of("brokers").unwrap();
let group_id = matches.value_of("group-id").unwrap();
consume_and_print(brokers, group_id, &topics).await
}
Golang
这里使用的是confluent-kafka-go
producer.go:
package main
import (
"fmt"
"math/rand"
"os"
"strconv"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
config := &kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"client.id": "go-producer",
}
topic := "purchases"
p, err := kafka.NewProducer(config)
if err != nil {
fmt.Printf("Failed to create producer: %s", err)
os.Exit(1)
}
// Go-routine to handle message delivery reports and
// possibly other event types (errors, stats, etc)
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
}
}
}
}()
users := [...]string{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}
items := [...]string{"book", "alarm clock", "t-shirts", "gift card", "batteries"}
for n := 0; n < 50; n++ {
key := users[rand.Intn(len(users))]
data := items[rand.Intn(len(items))]
currentTimeMillis := time.Now().UnixNano() / 1e6
data = strconv.FormatInt(currentTimeMillis, 10) // append current time to data
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Key: []byte(key),
Value: []byte(data),
}, nil)
}
// Wait for all messages to be delivered
p.Flush(15 * 1000)
p.Close()
}
consumer.go
package main
import (
"fmt"
"os"
"os/signal"
"strconv"
"syscall"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// if len(os.Args) != 2 {
// fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n",
// os.Args[0])
// os.Exit(1)
// }
// configFile := os.Args[1]
conf := kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"client.id": "go-producer",
}
conf["group.id"] = "kafka-go-getting-started"
conf["auto.offset.reset"] = "earliest"
c, err := kafka.NewConsumer(&conf)
if err != nil {
fmt.Printf("Failed to create consumer: %s", err)
os.Exit(1)
}
topic := "purchases"
err = c.SubscribeTopics([]string{topic}, nil)
// Set up a channel for handling Ctrl-C, etc
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)
// Process messages
run := true
for run {
select {
case sig := <-sigchan:
fmt.Printf("Caught signal %v: terminating\n", sig)
run = false
default:
ev, err := c.ReadMessage(100 * time.Millisecond)
if err != nil {
// Errors are informational and automatically handled by the consumer
continue
}
fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",
*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
// parts := strings.Split(string(ev.Value))
// if len(parts) < 2 {
// fmt.Println("Invalid message format. Skipping.")
// continue
// }
// data := parts[0]
timestampStr := string(ev.Value)
// Convert the string timestamp to an integer
sentTimestamp, err := strconv.ParseInt(timestampStr, 10, 64)
if err != nil {
fmt.Println("Could not parse timestamp. Skipping.")
continue
}
receivedTimestamp := time.Now().UnixNano() / 1e6 // current time in milliseconds
latency := receivedTimestamp - sentTimestamp
fmt.Printf("Consumed event from topic %s: key = %-10s latency = %d ms\n",
*ev.TopicPartition.Topic, string(ev.Key), latency)
}
}
c.Close()
}
Python
这里使用的是confluent_kafka
producer.py:
import socket
from typing import Optional
from confluent_kafka import Producer
class KafkaProducer:
def __init__(self, bootstrap_servers: str, client_id: str) -> None:
self.conf = {"bootstrap.servers": bootstrap_servers, "client.id": client_id}
self.producer = Producer(self.conf)
def acked(self, err: Optional[str], msg: str) -> None:
if err is not None:
print(f"Failed to deliver message: {msg}: {err}")
else:
print(f"Message produced: {msg}")
def produce(
self, topic: str, key: Optional[str] = None, value: Optional[str] = None
) -> None:
self.producer.produce(topic, key=key, value=value, callback=self.acked)
def poll(self, timeout: float) -> None:
self.producer.poll(timeout)
def flush(self) -> None:
self.producer.flush()
if __name__ == "__main__":
producer = KafkaProducer(
bootstrap_servers="kafka:9092", client_id=socket.gethostname()
)
topic = "test"
producer.produce(topic, key="key", value="value")
producer.poll(1)
consumer.py
import sys
from typing import List, Optional
from confluent_kafka import Consumer, KafkaError, KafkaException
class KafkaConsumer:
def __init__(self, bootstrap_servers: str, group_id: str, auto_offset_reset: str = 'smallest') -> None:
self.conf = {
'bootstrap.servers': bootstrap_servers,
'group.id': group_id,
'auto.offset.reset': auto_offset_reset
}
self.consumer = Consumer(self.conf)
self.running = True
def basic_consume_loop(self, topics: List[str], timeout: float = 1.0) -> None:
try:
self.consumer.subscribe(topics)
while self.running:
msg = self.consumer.poll(timeout=timeout)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
print(msg.value().decode('utf-8'))
# do something with msg.value()
finally:
# Close down consumer to commit final offsets.
self.consumer.close()
def shutdown(self) -> None:
self.running = False
if __name__ == "__main__":
consumer = KafkaConsumer(bootstrap_servers="kafka:9092", group_id="foo", auto_offset_reset="smallest")
topics = ["test"]
consumer.basic_consume_loop(topics)