This example shows how to use the producer.
Note:
Since version 4.0, proper shutdown (using flush) is responsibility of the application. Please check the example below. Without proper shutdown, messages can get lost.
Example #1 Producer example
<?php
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'localhost:9092');
//If you need to produce exactly once and want to keep the original produce order, uncomment the line below
//$conf->set('enable.idempotence', 'true');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test");
for ($i = 0; $i < 10; $i++) {
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
$producer->poll(0);
}
for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
$result = $producer->flush(10000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
break;
}
}
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
throw new \RuntimeException('Was unable to flush, messages might be lost!');
}
?>
If for some reason you wan't to purge messages that are queued to be sent to the producer or inflight, you can use purge to do so.
Example #2 Purge example
<?php
// Purge messages not yet sent to the broker (this is a safe operation to do)
$rk->purge(RD_KAFKA_PURGE_F_QUEUE);
// You can also purge messages that are inflight, not that this poses risks
// Purge messages in-flight to or from the broker.
// Purging these messages will void any future acknowledgements from the
// broker, making it impossible for the application to know if these
// messages were successfully delivered or not.
// Retrying these messages may lead to duplicates.
$rk->purge(RD_KAFKA_PURGE_F_INFLIGHT);
?>