(PECL rdkafka >= 1.0.0, librdkafka >= 0.9)

RdKafka\Conf::setRebalanceCbSet rebalance callback


public RdKafka\Conf::setRebalanceCb ( callable $callback ) : void

Set rebalance callback for use with coordinated consumer group balancing.

Registering a rebalance_cb turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb.

The rebalance callback is responsible for updating librdkafka's assignment set based on the two events RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS and RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS but should also be able to handle arbitrary rebalancing failures where err is neither of those.


In this latter case (arbitrary error), the application must $kafka->assign(NULL) to synchronize state.

Without a rebalance callback this is done automatically by librdkafka but registering a rebalance callback gives the application flexibility in performing other operations along with the assinging/revocation, such as fetching offsets from an alternate location (on assign) or manually committing offsets (on revoke).

The example below shows the application's responsibilities.


callback (callable)

A callable with the following signature:

 * @param RdKafka\KafkaConsumer $kafka
 * @param int $err
 * @param RdKafka\TopicPartition[] $partitions
function (RdKafka\KafkaConsumer $kafkaint $err, array $partitions);
The err parameter is set to either RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS or RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS (or an unexpected error). The partitions parameter is an array of RdKafka\TopicPartition, representing the full partition set that was either assigned or revoked.

Return Values

Returns no value.


Example #1 RdKafka\Conf::setRebalanceCb() example

->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka$err, array $partitions null) {
    switch (
$err) {
// application may load offets from arbitrary external
            // storage here and update partitions

             if (
$manual_commits) {
// Optional explicit manual commit

$kafka->assign(NULL); // sync state