Once you’ve installed the PHP-Kafka extension, you’re ready to script PHP to consume messages. The php-RDKafka extension provides several ways to handle messages

Low level

There is no concept of a consumer group in this approach

<? php $rk = new RdKafka\Consumer(); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("192.168.33.1:9092"); $rk->addBrokers("192.168.33.1:9092"); $topic = $rk->newTopic("test"); $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); $MSG = $topic->consume(0, 1000); $MSG = $topic->consume(0, 1000); if ($msg->err) { echo $msg->errstr(), "\n"; break; } else { echo $msg->payload, "\n"; }}Copy the code

This allows you to specify a consumer group within which a consumer process can read only one partition.


      

$conf = new RdKafka\Conf();

// Set a rebalance callback to log partition assignments (optional)
Kafka automatically reassigns partitions to consumer processes when new consumer processes join or leave the consumer group. This registers a callback function that is triggered when a partition is reassigned
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka.$err.array $partitions = null) {
    switch ($err) {
        case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            echo "Assign: ";
            var_dump($partitions);
            $kafka->assign($partitions);
            break;

        case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
            echo "Revoke: ";
            var_dump($partitions);
            $kafka->assign(NULL);
            break;

        default:
            throw new \Exception($err); }});// Configure groud.id for consumers with the same groud.id to process messages from different partitions, so if the number of consumers in the same group subscribing to a topic, it makes no sense to have more consumer processes than the number of topic partitions.
$conf->set('group.id'.'myConsumerGroup1');

// Add the kafka cluster server address
$conf->set('metadata.broker.list'.'192.168.33.1:9092');

$topicConf = new RdKafka\TopicConf();


// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
// Where to start reading when there is no initial offset
$topicConf->set('auto.offset.reset'.'smallest');


// Set the configuration to use for subscribed/assigned topics
$conf->setDefaultTopicConf($topicConf);

$consumer = new RdKafka\KafkaConsumer($conf);

// Let the consumer subscribe to the log topic
$consumer->subscribe(['log']);


while (true) {
    $message = $consumer->consume(120*1000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            var_dump($message);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break; }}? >
Copy the code

The above content hopes to help you, more free PHP factory PDF, PHP advanced architecture video materials, PHP wonderful good article can be wechat search concerns: PHP open source community

2021 Jinsanyin four big factory interview real questions collection, must see!

Four years of PHP technical articles collation collection – PHP framework

A collection of four years’ worth of PHP technical articles – Microservices Architecture

Distributed Architecture is a four-year collection of PHP technical articles

Four years of PHP technical essays – High Concurrency scenarios

Four years of elite PHP technical article collation collection – database