kumo.kafka.build_producer(URI)
Constructs an AMQP client object.
URI
is the URI that references the AMQP server to which you want to connect.
Client Methods
The returned client object has the following methods:
client:send({PARAMS})
Sends a message. PARAMS
is an object style table with the
following keys:
topic
- required string; the name of the queue to which to send the messagepayload
- required string; the message to sendtimeout
- how long to wait for a response.
The result from send is a tuple local partition, offset = producer:send {...}.
local producer = kumo.kafka.build_producer {
['bootstrap.servers'] = 'localhost:9092',
}
producer:send {
topic = 'my.topic',
payload = message:get_data(),
-- how long to keep trying to submit to kafka
-- before a lua error will be raised.
-- This is the default.
timeout = '1 minute',
}
client:send_batch({PARAMS})
Since: Dev Builds Only
The functionality described in this section requires a dev build of KumoMTA. You can obtain a dev build by following the instructions in the Installation section.
Sends a batch of messages. PARAMS
is a table of object style table with the
following keys:
topic
- required string; the name of the queue to which to send the messagepayload
- required string; the message to sendtimeout
- how long to wait for a response.
The result from send_batch is a tuple of tables: local failed_items, errors = producer:send_batch(...).
local producer = kumo.kafka.build_producer {
['bootstrap.servers'] = 'localhost:9092',
}
local failed_items, errors = producer:send_batch {
{
topic = 'my.topic',
payload = 'payload 1',
timeout = '1 minute',
},
{
topic = 'my.other.topic',
payload = 'payload 2',
timeout = '1 minute',
},
}
if #failed_items > 0 then
-- some items failed
for i, item_idx in ipairs(failed_items) do
local error = errors[i]
print(string.format('item idx %d failed: %s', item_idx, error))
end
end
client:close()
Since: Version 2024.09.02-c5476b89
The functionality described in this section requires version 2024.09.02-c5476b89 of KumoMTA, or a more recent version.
Explicitly close the client object and associated connection.