RdKafka\KafkaConsumer::consume

(PECL rdkafka >= 1.0.0, librdkafka >= 0.9)

RdKafka\KafkaConsumer::consumeConsume message and triggers callbacks

Description

public RdKafka\KafkaConsumer::consume ( int $timeout_ms ) : RdKafka\Message

Consume message or get error event, triggers callbacks.

Will automatically call registered callbacks for any such queued events, including rebalance_cb, event_cb, commit_cb, etc.

Note:

An application should make sure to call consume() at regular intervals, even if no messages are expected, to serve any queued callbacks waiting to be called. This is especially important when a rebalnce_cb has been registered as it needs to be called and handled properly to synchronize internal consumer state.

Parameters

timeout_ms (int)

Timeout (milliseconds)

Return Values

Returns a RdKafka\Message. On error or timeout, RdKafka\Message::$err is != RD_KAFKA_ERR_NO_ERROR, and other properties should be ignored.

Errors/Exceptions

  • Throws RdKafka\Exception on errors.
  • Throws InvalidArgumentException on argument parsing errors.

Examples

Example #1 RdKafka\KafkaConsumer::consume() example

<?php
while (true) {
    
$message $kafkaConsumer->consume(3600e3);
    switch (
$message->err) {
        case 
RD_KAFKA_RESP_ERR_NO_ERROR:
            
handle($message);
            break;
        case 
RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo 
"Timedout\n";
            break;
        default:
            throw new \
Exception($message->errstr());
            break;
    }
}
?>