Producer

This example shows how to use the producer.

Note:

Since version 4.0, proper shutdown (using flush) is responsibility of the application. Please check the example below. Without proper shutdown, messages can get lost.

Example #1 Producer example

<?php

$conf 
= new RdKafka\Conf();
$conf->set('metadata.broker.list''localhost:9092');

//If you need to produce exactly once and want to keep the original produce order, uncomment the line below
//$conf->set('enable.idempotence', 'true');

$producer = new RdKafka\Producer($conf);

$topic $producer->newTopic("test");

for (
$i 0$i 10$i++) {
    
$topic->produce(RD_KAFKA_PARTITION_UA0"Message $i");
    
$producer->poll(0);
}

for (
$flushRetries 0$flushRetries 10$flushRetries++) {
    
$result $producer->flush(10000);
    if (
RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

if (
RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
    throw new \
RuntimeException('Was unable to flush, messages might be lost!');
}

?>

If for some reason you wan't to purge messages that are queued to be sent to the producer or inflight, you can use purge to do so.

Example #2 Purge example

<?php

// Purge messages not yet sent to the broker (this is a safe operation to do)
$rk->purge(RD_KAFKA_PURGE_F_QUEUE);

// You can also purge messages that are inflight, not that this poses risks
// Purge messages in-flight to or from the broker.
// Purging these messages will void any future acknowledgements from the
// broker, making it impossible for the application to know if these
// messages were successfully delivered or not.
// Retrying these messages may lead to duplicates.
$rk->purge(RD_KAFKA_PURGE_F_INFLIGHT);
?>