Published on

Kafka in multiple language

Authors
  • avatar
    Name
    Lif
    Twitter

Installation

curl -sSL https://raw.githubusercontent.com/bitnami/containers/main/bitnami/kafka/docker-compose.yml > docker-compose.yml
docker-compose up -d

Rust

这里使用的是rdkafka,

producer.rs:

use std::time::Duration;

use clap::{App, Arg};
use log::info;

use rdkafka::config::ClientConfig;
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::get_rdkafka_version;

use crate::example_utils::setup_logger;

mod example_utils;

async fn produce(brokers: &str, topic_name: &str) {
    let producer: &FutureProducer = &ClientConfig::new()
        .set("bootstrap.servers", brokers)
        .set("message.timeout.ms", "5000")
        .create()
        .expect("Producer creation error");

    let send_time = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .expect("Time went backwards")
        .as_millis(); // 获取当前时间戳(毫秒)

    // This loop is non blocking: all messages will be sent one after the other, without waiting
    // for the results.
    let futures = (0..5)
        .map(|i| async move {
            // The send operation on the topic returns a future, which will be
            // completed once the result or failure from Kafka is received.
            let delivery_status = producer
                .send(
                    FutureRecord::to(topic_name)
                        .payload(&format!("{}",  send_time))
                        .key(&format!("Key {}", i))
                        .headers(OwnedHeaders::new().insert(Header {
                            key: "header_key",
                            value: Some("header_value"),
                        })),
                    Duration::from_secs(0),
                )
                .await;

            // This will be executed when the result is received.
            info!("Delivery status for message {} received", i);
            delivery_status
        })
        .collect::<Vec<_>>();

    // This loop will wait until all delivery statuses have been received.
    for future in futures {
        info!("Future completed. Result: {:?}", future.await);
    }
}

#[tokio::main]
async fn main() {
    let matches = App::new("producer example")
        .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
        .about("Simple command line producer")
        .arg(
            Arg::with_name("brokers")
                .short("b")
                .long("brokers")
                .help("Broker list in kafka format")
                .takes_value(true)
                .default_value("localhost:9092"),
        )
        .arg(
            Arg::with_name("log-conf")
                .long("log-conf")
                .help("Configure the logging format (example: 'rdkafka=trace')")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("topic")
                .short("t")
                .long("topic")
                .help("Destination topic")
                .takes_value(true)
                .required(true),
        )
        .get_matches();

    setup_logger(true, matches.value_of("log-conf"));

    let (version_n, version_s) = get_rdkafka_version();
    info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);

    let topic = matches.value_of("topic").unwrap();
    let brokers = matches.value_of("brokers").unwrap();

    produce(brokers, topic).await;
}

consumer.rs

use clap::{App, Arg};
use log::{info, warn};

use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
use rdkafka::error::KafkaResult;
use rdkafka::message::{Headers, Message};
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::util::get_rdkafka_version;

use crate::example_utils::setup_logger;

mod example_utils;

// A context can be used to change the behavior of producers and consumers by adding callbacks
// that will be executed by librdkafka.
// This particular context sets up custom callbacks to log rebalancing events.
struct CustomContext;

impl ClientContext for CustomContext {}

impl ConsumerContext for CustomContext {
    fn pre_rebalance(&self, rebalance: &Rebalance) {
        info!("Pre rebalance {:?}", rebalance);
    }

    fn post_rebalance(&self, rebalance: &Rebalance) {
        info!("Post rebalance {:?}", rebalance);
    }

    fn commit_callback(&self, result: KafkaResult<()>, _offsets: &TopicPartitionList) {
        info!("Committing offsets: {:?}", result);
    }
}

// A type alias with your custom consumer can be created for convenience.
type LoggingConsumer = StreamConsumer<CustomContext>;

async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
    let context = CustomContext;

    let consumer: LoggingConsumer = ClientConfig::new()
        .set("group.id", group_id)
        .set("bootstrap.servers", brokers)
        .set("enable.partition.eof", "false")
        .set("session.timeout.ms", "6000")
        .set("enable.auto.commit", "true")
        //.set("statistics.interval.ms", "30000")
        //.set("auto.offset.reset", "smallest")
        .set_log_level(RDKafkaLogLevel::Debug)
        .create_with_context(context)
        .expect("Consumer creation failed");

    consumer
        .subscribe(&topics.to_vec())
        .expect("Can't subscribe to specified topics");

    loop {
        let start = std::time::Instant::now();
        match consumer.recv().await {
            Err(e) => warn!("Kafka error: {}", e),
            Ok(m) => {
                info!("Start time is: {:?}", start);
                let duration = start.elapsed();
                info!("Time elapsed receiving the message is: {:?}", duration);
                let payload = match m.payload_view::<str>() {
                    None => "",
                    Some(Ok(s)) => s,
                    Some(Err(e)) => {
                        warn!("Error while deserializing message payload: {:?}", e);
                        ""
                    }
                };
                println!("pay load is {}", payload);
                let payload_parts: Vec<&str> = payload.split(" with timestamp ").collect();
                if payload_parts.len() == 2 {
                    let send_time: u128 = payload_parts[1].parse().unwrap_or(0);
                    let receive_time = std::time::SystemTime::now()
                        .duration_since(std::time::UNIX_EPOCH)
                        .expect("Time went backwards")
                        .as_millis(); // 获取当前时间戳(毫秒)
                    let elapsed_time = receive_time - send_time; // 计算耗时
                    info!("Received time: {}  {}", receive_time, send_time);
                    info!("Elapsed time: {} ms", elapsed_time);
                }

                // let send_time: u128 = payload.parse().unwrap_or(1);
                // println!("send time is {} ", send_time);
                info!("key: '{:?}', payload: '{}', topic: {}, partition: {}, offset: {}, timestamp: {:?}",
                      m.key(), payload, m.topic(), m.partition(), m.offset(), m.timestamp());
                if let Some(headers) = m.headers() {
                    for header in headers.iter() {
                        info!("  Header {:#?}: {:?}", header.key, header.value);
                    }
                }
                consumer.commit_message(&m, CommitMode::Async).unwrap();
            }
        };
    }
}

#[tokio::main]
async fn main() {
    let matches = App::new("consumer example")
        .version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
        .about("Simple command line consumer")
        .arg(
            Arg::with_name("brokers")
                .short("b")
                .long("brokers")
                .help("Broker list in kafka format")
                .takes_value(true)
                .default_value("localhost:9092"),
        )
        .arg(
            Arg::with_name("group-id")
                .short("g")
                .long("group-id")
                .help("Consumer group id")
                .takes_value(true)
                .default_value("example_consumer_group_id"),
        )
        .arg(
            Arg::with_name("log-conf")
                .long("log-conf")
                .help("Configure the logging format (example: 'rdkafka=trace')")
                .takes_value(true),
        )
        .arg(
            Arg::with_name("topics")
                .short("t")
                .long("topics")
                .help("Topic list")
                .takes_value(true)
                .multiple(true)
                .required(true),
        )
        .get_matches();

    setup_logger(true, matches.value_of("log-conf"));

    let (version_n, version_s) = get_rdkafka_version();
    // info!("rd_kafka_version: 0x{:08x}, {}", version_n, version_s);

    let topics = matches.values_of("topics").unwrap().collect::<Vec<&str>>();
    let brokers = matches.value_of("brokers").unwrap();
    let group_id = matches.value_of("group-id").unwrap();

    consume_and_print(brokers, group_id, &topics).await
}

Golang

这里使用的是confluent-kafka-go

producer.go:

package main

import (
	"fmt"
	"math/rand"
	"os"
	"strconv"
	"time"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

	config := &kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092",
		"client.id":         "go-producer",
	}

	topic := "purchases"
	p, err := kafka.NewProducer(config)

	if err != nil {
		fmt.Printf("Failed to create producer: %s", err)
		os.Exit(1)
	}

	// Go-routine to handle message delivery reports and
	// possibly other event types (errors, stats, etc)
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Failed to deliver message: %v\n", ev.TopicPartition)
				} else {
					fmt.Printf("Produced event to topic %s: key = %-10s value = %s\n",
						*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
				}
			}
		}
	}()

	users := [...]string{"eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther"}
	items := [...]string{"book", "alarm clock", "t-shirts", "gift card", "batteries"}

	for n := 0; n < 50; n++ {
		key := users[rand.Intn(len(users))]
		data := items[rand.Intn(len(items))]

		currentTimeMillis := time.Now().UnixNano() / 1e6
		data = strconv.FormatInt(currentTimeMillis, 10) // append current time to data

		p.Produce(&kafka.Message{
			TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
			Key:            []byte(key),
			Value:          []byte(data),
		}, nil)
	}

	// Wait for all messages to be delivered
	p.Flush(15 * 1000)
	p.Close()
}

consumer.go

package main

import (
	"fmt"
	"os"
	"os/signal"
	"strconv"
	"syscall"
	"time"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

	// if len(os.Args) != 2 {
	// 	fmt.Fprintf(os.Stderr, "Usage: %s <config-file-path>\n",
	// 		os.Args[0])
	// 	os.Exit(1)
	// }

	// configFile := os.Args[1]
	conf := kafka.ConfigMap{
		"bootstrap.servers": "localhost:9092",
		"client.id":         "go-producer",
	}
	conf["group.id"] = "kafka-go-getting-started"
	conf["auto.offset.reset"] = "earliest"

	c, err := kafka.NewConsumer(&conf)

	if err != nil {
		fmt.Printf("Failed to create consumer: %s", err)
		os.Exit(1)
	}

	topic := "purchases"
	err = c.SubscribeTopics([]string{topic}, nil)
	// Set up a channel for handling Ctrl-C, etc
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	// Process messages
	run := true
	for run {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false
		default:
			ev, err := c.ReadMessage(100 * time.Millisecond)
			if err != nil {
				// Errors are informational and automatically handled by the consumer
				continue
			}
			fmt.Printf("Consumed event from topic %s: key = %-10s value = %s\n",
				*ev.TopicPartition.Topic, string(ev.Key), string(ev.Value))
			// parts := strings.Split(string(ev.Value))
			// if len(parts) < 2 {
			// 	fmt.Println("Invalid message format. Skipping.")
			// 	continue
			// }

			// data := parts[0]
			timestampStr := string(ev.Value)

			// Convert the string timestamp to an integer
			sentTimestamp, err := strconv.ParseInt(timestampStr, 10, 64)
			if err != nil {
				fmt.Println("Could not parse timestamp. Skipping.")
				continue
			}

			receivedTimestamp := time.Now().UnixNano() / 1e6 // current time in milliseconds

			latency := receivedTimestamp - sentTimestamp
			fmt.Printf("Consumed event from topic %s: key = %-10s  latency = %d ms\n",
				*ev.TopicPartition.Topic, string(ev.Key), latency)
		}
	}

	c.Close()
}

Python

这里使用的是confluent_kafka

producer.py:

import socket
from typing import Optional

from confluent_kafka import Producer


class KafkaProducer:
    def __init__(self, bootstrap_servers: str, client_id: str) -> None:
        self.conf = {"bootstrap.servers": bootstrap_servers, "client.id": client_id}
        self.producer = Producer(self.conf)

    def acked(self, err: Optional[str], msg: str) -> None:
        if err is not None:
            print(f"Failed to deliver message: {msg}: {err}")
        else:
            print(f"Message produced: {msg}")

    def produce(
        self, topic: str, key: Optional[str] = None, value: Optional[str] = None
    ) -> None:
        self.producer.produce(topic, key=key, value=value, callback=self.acked)

    def poll(self, timeout: float) -> None:
        self.producer.poll(timeout)

    def flush(self) -> None:
        self.producer.flush()


if __name__ == "__main__":
    producer = KafkaProducer(
        bootstrap_servers="kafka:9092", client_id=socket.gethostname()
    )
    topic = "test"
    producer.produce(topic, key="key", value="value")
    producer.poll(1)

consumer.py

import sys
from typing import List, Optional

from confluent_kafka import Consumer, KafkaError, KafkaException


class KafkaConsumer:
    def __init__(self, bootstrap_servers: str, group_id: str, auto_offset_reset: str = 'smallest') -> None:
        self.conf = {
            'bootstrap.servers': bootstrap_servers,
            'group.id': group_id,
            'auto.offset.reset': auto_offset_reset
        }
        self.consumer = Consumer(self.conf)
        self.running = True

    def basic_consume_loop(self, topics: List[str], timeout: float = 1.0) -> None:
        try:
            self.consumer.subscribe(topics)

            while self.running:
                msg = self.consumer.poll(timeout=timeout)
                if msg is None:
                    continue

                if msg.error():
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                         (msg.topic(), msg.partition(), msg.offset()))
                    elif msg.error():
                        raise KafkaException(msg.error())
                else:
                    print(msg.value().decode('utf-8'))
                    # do something with msg.value()
        finally:
            # Close down consumer to commit final offsets.
            self.consumer.close()

    def shutdown(self) -> None:
        self.running = False

if __name__ == "__main__":
    consumer = KafkaConsumer(bootstrap_servers="kafka:9092", group_id="foo", auto_offset_reset="smallest")
    topics = ["test"]
    consumer.basic_consume_loop(topics)