Kafka is a distributed messaging middleware developed by LinkedIn.

The installation

Go to the download page to download the latest release, which is currently 2.3.0(released on 25 June 2019).

❯ wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz ❯ tar - XZF kafka_2. 12-2.3.0. TGZ ❯cdKafka_2. 12-2.3.0Copy the code

Zookeeper is an important component of Hadoop and Hbase. Zookeeper can coordinate services for distributed applications. Create a single ZooKeeper instance quickly and easily:

❯ bin/zookeeper server - start. Sh config/zookeeper propertiesPut it in terminal 1 or TMUX
Copy the code

Then start the Kafka server (Openjdk should be installed before you start it) :

❯ bin/kafka - server - start. Sh config/server propertiesCopy the code

The Kafka server is now running, and you can use the script to create a Topic, publish and consume messages on the terminal:

# Create a topic called "strconv" with a partition and a copy❯ bin/kafka-topics. Sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic strconv# List all topics❯ bin/kafka-topics. Sh --list --zookeeper localhost:2181 __consumer_offsets strconvtest
Start producer, enter 2 messages in interactive mode
❯ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic strconv
>Message 1
>Message 2
The start consumer, consumed from the start, dumps the message to standard output
❯ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic strconv --from-beginning
Message 1
Message 2
Copy the code

I will not continue to demonstrate the use of multi-agent clusters, you can see the official documentation.

Next, we write producers and consumers in Golang. There are currently two major Golang clients, so let’s try each one.

By the way, LinkedIn has open-source Kafka, but the company’s core language is Java and Golang apps are rare, so it doesn’t have a Golang client of its own.

confluent-kafka-go

Confluent kafka-Go is conFluent’s open source Golang client. It is the Golang wrapper of librdkafka, a C/C++ client. Install it first:

❯ brew install librdkafka PKG - config ❯ go get -u gopkg. In/confluentinc/confluent - kafka - go. V1 / kafkaCopy the code

There are many examples in the examples directory under the project. Let me write an example. Let’s look at the producer:

package main

import (
	"fmt"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
	"os"
)

func main(a) {

	if len(os.Args) ! =3 {
		fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
			os.Args[0])
		os.Exit(1)
	}

	broker := os.Args[1]
	topic := os.Args[2]

	p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": broker})

	iferr ! =nil {
		fmt.Printf("Failed to create producer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Producer %v\n", p)
	deliveryChan := make(chan kafka.Event)

	value := "Hello Go!"
	err = p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
		Value:          []byte(value),
		Headers:        []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
	}, deliveryChan)

	e := <-deliveryChan
	m := e.(*kafka.Message)

	ifm.TopicPartition.Error ! =nil {
		fmt.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
	} else {
		fmt.Printf("Delivered message to topic %s [%d] at offset %v\n",
			*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
	}

	close(deliveryChan)
}
Copy the code

In the example os.args is used to get the result of the terminal position parameter. Confluent_producer. go needs to pass two parameters: broker and topic. Use the Produce method to publish a message saying “Hello Go!” In addition, the message contains a key myTestHeader and a “header values are binary” header. Finally, see the result of Message to determine whether the delivery is successful. After success, the partition and consumption progress offset will be printed. Then there are the consumers:

package main

import (
	"fmt"
	"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
	"os"
	"os/signal"
	"syscall"
)

func main(a) {

	if len(os.Args) < 4 {
		fmt.Fprintf(os.Stderr, "Usage: %s 
       
        
        
         \n"
        
       
      ,
			os.Args[0])
		os.Exit(1)
	}

	broker := os.Args[1]
	group := os.Args[2]
	topics := os.Args[3:]
	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": broker,
		"broker.address.family": "v4"."group.id":              group,
		"session.timeout.ms":    6000."auto.offset.reset":     "earliest"})

	iferr ! =nil {
		fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Consumer %v\n", c)

	err = c.SubscribeTopics(topics, nil)

	run := true

	for run == true {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false
		default:
			ev := c.Poll(100)
			if ev == nil {
				continue
			}

			switch e := ev.(type) {
			case *kafka.Message:
				fmt.Printf("%% Message on %s:\n%s\n",
					e.TopicPartition, string(e.Value))
				ife.Headers ! =nil {
					fmt.Printf("%% Headers: %v\n", e.Headers)
				}
			case kafka.Error:
				fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
				if e.Code() == kafka.ErrAllBrokersDown {
					run = false
				}
			default:
				fmt.Printf("Ignored %v\n", e)
			}
		}
	}

	fmt.Printf("Closing consumer\n")
	c.Close()
}
Copy the code

The consumer accepts three parameters: broker address, GroupID, and Topic name. GroupID operates on groups of consumers. The same GroupID identifies consumers in a group. These consumers coordinate together to consume all partitions of a subscribed topic, so a new GroupID can subscribe to all partitions of a topic once again. First produce two messages:

❯ go run confluent_producer.go localhost:9092 strconv
Created Producer rdkafka#producer-1Delivered message to Topic strconv [0] at offset 0 ❯ go run confluent_producer. Go localhost:9092 Strconv Created Producer rdkafka#producer-1
Delivered message to topic strconv [0] at offset 1
Copy the code

Since strconv has only one partition, the output is [0], and offset is incremented from 0. Then start the consumer:

Terminal # 1❯ go run confluent_consumer. Go localhost:9092 1 Strconv Created Consumer rdkafka#consumer-1
% Message on strconv[0]@0:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
% Message on strconv[0]@1:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
Ignored OffsetsCommitted (<nil>, [strconv[0]@2])
# terminal 2❯ go run confluent_consumer. Go localhost:9092 2 Strconv Created Consumer rdkafka#consumer-1
% Message on strconv[0]@0:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
% Message on strconv[0]@1:
Hello Go!
% Headers: [myTestHeader="header values are binary"]
Ignored OffsetsCommitted (<nil>, [strconv[0]@2])
Copy the code

For the two terminals, the GroupID is different, so they each consume all the messages (2).

In addition, for all code parts in this article, the producer receives two parameters: message proxy server address and Topic name, and the consumer receives three parameters: message proxy server address, GroupID, and Topic name.

Sarama

Sarama is Shopify’s open source Golang client. The first step is to install it:

❯ go get -u github.com/Shopify/sarama
Copy the code

To demonstrate multi-partition messages, this time create a new Topic(called Sarama) with two partitions:

❯ bin/kafka-topics. Sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic saramaCopy the code

Then look at the producers:

package main

import (
	"fmt"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"strconv"
	"time"

	"github.com/Shopify/sarama"
)

func main(a) {
	if len(os.Args) ! =3 {
		fmt.Fprintf(os.Stderr, "Usage: %s <broker> <topic>\n",
			os.Args[0])
		os.Exit(1)
	}

	broker := os.Args[1]
	topic := os.Args[2]

	config := sarama.NewConfig()
	config.Producer.Retry.Max = 5
	config.Producer.RequiredAcks = sarama.WaitForAll
	producer, err := sarama.NewAsyncProducer([]string{broker}, config)
	iferr ! =nil {
		panic(err)
	}

	defer func(a) {
		iferr := producer.Close(); err ! =nil {
			panic(err)
		}
	}()

	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	chars := []byte("ABCDEFGHIJKLMNOPQRSTUVWXYZ")

	var enqueued, errors int
	doneCh := make(chan struct{})
	go func(a) {
		for {

			time.Sleep(1 * time.Second)

			buf := make([]byte.4)
			for i := 0; i < 4; i++ {
				buf[i] = chars[rand.Intn(len(chars))]
			}

			strTime := strconv.Itoa(int(time.Now().Unix()))
			msg := &sarama.ProducerMessage{
				Topic: topic,
				Key:   sarama.StringEncoder(strTime),
				Value: sarama.StringEncoder(buf),
			}
			select {
			case producer.Input() <- msg:
				enqueued++
				fmt.Printf("Produce message: %s\n", buf)
			case err := <-producer.Errors():
				errors++
				fmt.Println("Failed to produce message:", err)
			case <-signals:
				doneCh <- struct{}{}
			}
		}
	}()

	<-doneCh
	log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
}
Copy the code

Sarama has AsyncProducer and SyncProducer2, which is asynchronous. Each time the key is timestamp, a message with a random value of four strings is published through the producer.input () channel.

Consumer programs also use consumer groups:

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"syscall"

	"github.com/Shopify/sarama"
)

type Consumer struct {
	ready chan bool
}

func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
	close(consumer.ready)
	return nil
}

func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
	return nil
}

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
	for message := range claim.Messages() {
		log.Printf("Message claimed: key = %s, value = %v, topic = %s, partition = %v, offset = %v".string(message.Key), string(message.Value), message.Topic, message.Partition, message.Offset)
		session.MarkMessage(message, "")}return nil
}

func main(a) {
	if len(os.Args) < 4 {
		fmt.Fprintf(os.Stderr, "Usage: %s 
       
        
        
         \n"
        
       
      ,
			os.Args[0])
		os.Exit(1)
	}

	broker := os.Args[1]
	group := os.Args[2]
	topics := os.Args[3:]

	version, err := sarama.ParseKafkaVersion("2.3.0")
	iferr ! =nil {
		log.Panicf("Error parsing Kafka version: %v", err)
	}

	config := sarama.NewConfig()
	config.Version = version
	consumer := Consumer{
		ready: make(chan bool.0),
	}

	ctx, cancel := context.WithCancel(context.Background())
	client, err := sarama.NewConsumerGroup([]string{broker}, group, config)
	iferr ! =nil {
		log.Panicf("Error creating consumer group client: %v", err)
	}

	wg := &sync.WaitGroup{}
	go func(a) {
		wg.Add(1)
		defer wg.Done()
		for {
			iferr := client.Consume(ctx, topics, &consumer); err ! =nil {
				log.Panicf("Error from consumer: %v", err)
			}
			ifctx.Err() ! =nil {
				return
			}
			consumer.ready = make(chan bool.0)
		}
	}()

	<-consumer.ready

	sigterm := make(chan os.Signal, 1)
	signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
	select {
	case <-ctx.Done():
		log.Println("terminating: context cancelled")
	case <-sigterm:
		log.Println("terminating: via signal")
	}
	cancel()
	wg.Wait()
	iferr = client.Close(); err ! =nil {
		log.Panicf("Error closing client: %v", err)
	}
}
Copy the code

Consumers to this a bit more complicated, the first statement of the Consumer structure, containing the Setup/Cleanup/ConsumeClaim method, this is the need when processing method. In addition, sarama.parsekafkUncomfortable (“2.3.0”) is required to specify the Kafka version under the consumption group usage. In addition, the Signal Signal is added to this, when the terminal program will determine the cause of termination, if it is caused by a Signal such as Ctrl+ C, it will display the terminating message: Via signal.

In addition, the consumption logic is run in the coroutine, and sync.waitGroup is used to ensure that the coroutine finishes running before closing. Note also that the context created with context.withcancel also returns the cancel function, and the context needs to be cancelled at the end, otherwise the signal cannot terminate the program.

Run it:

❯ go run sarama_producer. Go localhost:9092 Sarama Produce message: BZGB Produce message: CTCU Produce message: SJFB Produce message: DNJO Produce message: EZQL Produce message: JZPF Produce message: SBZR Produce message: FDZD ❯ go run sarama_consumer. Go localhost:9092 1 Sarama 2019/07/19 13:01:07 Message claimed: key = 1563512466, value = BZGB, topic = sarama, partition = 0, offset = 0 2019/07/19 13:01:07 Message claimed: key = 1563512467, value = CTCU, topic = sarama, partition = 1, offset = 0 2019/07/19 13:01:08 Message claimed: key = 1563512468, value = SJFB, topic = sarama, partition = 0, offset = 1 2019/07/19 13:01:09 Message claimed: key = 1563512469, value = DNJO, topic = sarama, partition = 1, offset = 1 2019/07/19 13:01:10 Message claimed: key = 1563512470, value = EZQL, topic = sarama, partition = 1, offset = 2 2019/07/19 13:01:11 Message claimed: key = 1563512471, value = JZPF, topic = sarama, partition = 0, offset = 2 2019/07/19 13:01:12 Message claimed: key = 1563512472, value = SBZR, topic = sarama, partition = 1, offset = 3 2019/07/19 13:01:13 Message claimed: key = 1563512473, value = FDZD, topic = sarama, partition = 0, offset = 3Copy the code

You can see that the messages are distributed relatively evenly between partitions 0 and 1. Alternatively, you can subscribe to other GroupID messages on other terminals with other GroupID messages such as go run sarama_consumer.go localhost:9092 2 sarama(GroupID is 2).

If the GroupID of multiple endpoints is the same, different processes will consume messages in the corresponding partition of the binding and will not re-consume them. For example, if two terminals execute go run sarama_consumer. Go localhost:9092 1 sarama, terminal A will consume messages from partition 0 and terminal B will consume messages from partition 1. But with three terminals executing, since there are only two partitions, terminal C is not going to consume anything because it’s not bound to the partition, right

Afterword.

Sarama is very poor in both documentation and API. I don’t understand that Star is much higher than Confluent – kafka-Go. Maybe Sarama was created earlier.

I have decided to use Confluent-kafka-Go in future projects. If you have any experience in production environment, please leave a comment and let me know

The code address

Original address: strconv.com/posts/use-k…

The full code can be found at this address.

read

  1. kafka.apache.org/quickstart