(PECL rdkafka >= 3.1.0, librdkafka >= 0.11.4)
RdKafka\ProducerTopic::producev — Produce and send a single message with headers
$partition
, integer $msgflags
, string $payload
[, string $key
= NULL
[, array $headers
= NULL
[, integer $timestamp_ms
= NULL
[, string $opaque
= NULL
]]]] ) : voidProduce and send a single message to broker with headers, timestamp.
This is async and non-blocking.
Note:
Since producing is asynchronous, you should call flush before you destroy the producer. Otherwise, any outstanding messages will be silently discarded.
partition
Can be either RD_KAFKA_PARTITION_UA
(unassigned) for automatic partitioning using the topic's partitioner function (see RdKafka\TopicConf::setPartitioner(), or a fixed partition (0..N).
msgflags
Must be either 0 or RD_KAFKA_MSG_F_BLOCK
payload
Payload string
key
Optional message key, if non-NULL
it will be passed to the topic partitioner as well as be sent with the message to the broker and passed on to the consumer.
headers
Accepts only associative array or null
timestamp_ms
Timestamp that should be set for the message, if not set, the broker will set it
opaque
An optional application-provided per-message opaque string that will be provided in the message's delivary callback (see RdKafka\Conf::setDrMsgCb()). (Since PECL rdkafka 5.0.0 with librdkafka >= 1.0.0.)
Returns no value.
Version | Description |
---|---|
5.0.0 |
Added optional opaque parameter.
|
Example #1 RdKafka\ProducerTopic::producev() example
<?php
$message = [
'type' => 'account-created',
'id' => $accountId,
'date' => date(DATE_W3C),
];
$headers = [
'SomeKey' => 'SomeValue',
'AnotherKey' => 'AnotherValue',
];
$payload = json_encode($message);
$topic->producev(RD_KAFKA_PARTITION_UA, 0, $payload, $accountId, $headers);
?>