Transactional producer

This example shows how to use the producer with transactions.

Note:

Since version 4.0, proper shutdown (using flush) is responsibility of the application. Please check the example below. Without proper shutdown, messages can get lost.

Example #1 Transactional producer example

<?php

$conf 
= new RdKafka\Conf();
$conf->set('metadata.broker.list''localhost:9092');
$conf->set('transactional.id''some-id');

$producer = new RdKafka\Producer($conf);

$topic $producer->newTopic("test");

$producer->initTransactions(10000);
$producer->beginTransaction();

for (
$i 0$i 10$i++) {
    
$topic->produce(RD_KAFKA_PARTITION_UA0"Message $i");
    
$producer->poll(0);
}

//Any outstanding messages will be flushed (delivered) before actually committing the transaction.
$error $producer->commitTransaction(10000);

if (
RD_KAFKA_RESP_ERR_NO_ERROR !== $error) {
    
//check what kind of error it was e.g. $error->isFatal(), etc. and act accordingly (retry, abort, etc.)
}

?>

The transactional producer operates on top of the idempotent producer, and provides full exactly-once semantics (EOS) for Apache Kafka when used with the transaction aware consumer (isolation.level=read_committed, which is the default).

A producer instance is configured for transactions by setting the transactional.id to an identifier unique for the application. This id will be used to fence stale transactions from previous instances of the application, typically following an outage or crash.

After creating the producer instance the transactional state must be initialized by calling RdKafka\Producer::initTransactions(). This is a blocking call that will acquire a runtime producer id from the transaction coordinator broker as well as abort any stale transactions and fence any still running producer instances with the same transactional.id.

Once transactions are initialized the application may begin a new transaction by calling RdKafka\Producer::beginTransaction(). A producer instance may only have one single on-going transaction.

Any messages produced after the transaction has been started will belong to the ongoing transaction and will be committed or aborted atomically. It is not permitted to produce messages outside a transaction boundary, e.g., before RdKafka\Producer::beginTransaction() or after RdKafka\Producer::commitTransaction(), RdKafka\Producer::abortTransaction(), or after the current transaction has failed.

To commit the produced messages, and any consumed offsets, to the current transaction, call RdKafka\Producer::commitTransaction(). This call will block until the transaction has been fully committed or failed (typically due to fencing by a newer producer instance).

Alternatively, if processing fails, or an abortable transaction error is raised, the transaction needs to be aborted by calling RdKafka\Producer::abortTransaction() which marks any produced messages and offset commits as aborted.

After the current transaction has been committed or aborted a new transaction may be started by calling RdKafka\Producer::beginTransaction() again.

Note:

If you have a local docker setup with just one broker, be sure to use these envs in your docker-compose.yml for your broker:

KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1