(PECL rdkafka >= 1.0.0, librdkafka >= 0.9)
RdKafka\Conf::setRebalanceCb — Set rebalance callback
Set rebalance callback for use with coordinated consumer group balancing.
Registering a rebalance_cb
turns off librdkafka's automatic partition assignment/revocation and instead delegates that responsibility to the application's rebalance_cb
.
The rebalance callback is responsible for updating librdkafka's assignment set based on the two events RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
and RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
but should also be able to handle arbitrary rebalancing failures where err
is neither of those.
Note:
In this latter case (arbitrary error), the application must
$kafka->assign(NULL)
to synchronize state.
Without a rebalance callback this is done automatically by librdkafka but registering a rebalance callback gives the application flexibility in performing other operations along with the assinging/revocation, such as fetching offsets from an alternate location (on assign) or manually committing offsets (on revoke).
The example below shows the application's responsibilities.
callback
(callable)A callable with the following signature:
<?php
/**
* @param RdKafka\KafkaConsumer $kafka
* @param int $err
* @param RdKafka\TopicPartition[] $partitions
*/
function (RdKafka\KafkaConsumer $kafka, int $err, array $partitions);
err
parameter is set to either RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
or RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
(or an unexpected error). The partitions
parameter is an array of RdKafka\TopicPartition, representing the full partition set that was either assigned or revoked.
Returns no value.
Example #1 RdKafka\Conf::setRebalanceCb() example
<?php
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
// application may load offets from arbitrary external
// storage here and update partitions
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
if ($manual_commits) {
// Optional explicit manual commit
$kafka->commit($partitions);
}
$kafka->assign(NULL);
break;
default:
handle_unlikely_error($err);
$kafka->assign(NULL); // sync state
break;
}
}
?>