(PECL rdkafka >= 1.0.0, librdkafka >= 0.9)
RdKafka\KafkaConsumer::consume — Consume message and triggers callbacks
$timeout_ms
) : RdKafka\MessageConsume message or get error event, triggers callbacks.
Will automatically call registered callbacks for any such queued events, including rebalance_cb, event_cb, commit_cb, etc.
Note:
An application should make sure to call consume() at regular intervals, even if no messages are expected, to serve any queued callbacks waiting to be called. This is especially important when a rebalnce_cb has been registered as it needs to be called and handled properly to synchronize internal consumer state.
timeout_ms
(int)Timeout (milliseconds)
Returns a RdKafka\Message. On error or timeout, RdKafka\Message::$err is != RD_KAFKA_ERR_NO_ERROR
, and other properties should be ignored.
Example #1 RdKafka\KafkaConsumer::consume() example
<?php
while (true) {
$message = $kafkaConsumer->consume(3600e3);
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
handle($message);
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timedout\n";
break;
default:
throw new \Exception($message->errstr());
break;
}
}
?>