RdKafka\Conf::setRebalanceCb

(PECL rdkafka >= 1.0.0, librdkafka >= 0.9)

RdKafka\Conf::setRebalanceCbSet rebalance callback

Description

public RdKafka\Conf::setRebalanceCb ( callable $callback ) : void

Set rebalance callback for use with coordinated consumer group balancing.

Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb.

The rebalance callback is responsible for updating librdkafka's assignment set based on the two events RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS and RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS but should also be able to handle arbitrary rebalancing failures where err is neither of those.

Note:

In this latter case (arbitrary error), the application must $kafka->assign(NULL) to synchronize state.

Without a rebalance callback this is done automatically by librdkafka but registering a rebalance callback gives the application flexibility in performing other operations along with the assinging/revocation, such as fetching offsets from an alternate location (on assign) or manually committing offsets (on revoke).

The example below shows the application's responsibilities.

Parameters

callback (callable)

A callable with the following signature:

<?php
/**
 * @param RdKafka\KafkaConsumer $kafka
 * @param int $err
 * @param RdKafka\TopicPartition[] $partitions
 */
function (RdKafka\KafkaConsumer $kafkaint $err, array $partitions);
The err parameter is set to either RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS or RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS (or an unexpected error). The partitions parameter is an array of RdKafka\TopicPartition, representing the full partition set that was either assigned or revoked.

Return Values

Returns no value.

Examples

Example #1 RdKafka\Conf::setRebalanceCb() example

<?php
$conf
->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka$err, array $partitions null) {
    switch (
$err) {
        case 
RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
            
// application may load offets from arbitrary external
            // storage here and update partitions
            
$kafka->assign($partitions);
            break;

         case 
RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
             if (
$manual_commits) {
                 
// Optional explicit manual commit
                 
$kafka->commit($partitions);
             }
             
$kafka->assign(NULL);
             break;

         default:
            
handle_unlikely_error($err);
            
$kafka->assign(NULL); // sync state
            
break;
    }
}
?>