Skip to content

kumo.kafka.build_producer(URI)

Constructs an AMQP client object.

URI is the URI that references the AMQP server to which you want to connect.

local producer = kumo.kafka.build_producer {
  ['bootstrap.servers'] = 'localhost:9092',
}

Client Methods

The returned client object has the following methods:

client:send({PARAMS})

Sends a message. PARAMS is an object style table with the following keys:

  • topic - required string; the name of the queue to which to send the message
  • payload - required string; the message to send
  • timeout - how long to wait for a response.

The result from send is a tuple local partition, offset = producer:send {...}.

local producer = kumo.kafka.build_producer {
  ['bootstrap.servers'] = 'localhost:9092',
}

producer:send {
  topic = 'my.topic',
  payload = message:get_data(),
  -- how long to keep trying to submit to kafka
  -- before a lua error will be raised.
  -- This is the default.
  timeout = '1 minute',
}

client:send_batch({PARAMS})

Since: Dev Builds Only

The functionality described in this section requires a dev build of KumoMTA. You can obtain a dev build by following the instructions in the Installation section.

Sends a batch of messages. PARAMS is a table of object style table with the following keys:

  • topic - required string; the name of the queue to which to send the message
  • payload - required string; the message to send
  • timeout - how long to wait for a response.

The result from send_batch is a tuple of tables: local failed_items, errors = producer:send_batch(...).

local producer = kumo.kafka.build_producer {
  ['bootstrap.servers'] = 'localhost:9092',
}

local failed_items, errors = producer:send_batch {
  {
    topic = 'my.topic',
    payload = 'payload 1',
    timeout = '1 minute',
  },
  {
    topic = 'my.other.topic',
    payload = 'payload 2',
    timeout = '1 minute',
  },
}
if #failed_items > 0 then
  -- some items failed
  for i, item_idx in ipairs(failed_items) do
    local error = errors[i]
    print(string.format('item idx %d failed: %s', item_idx, error))
  end
end

client:close()

Since: Version 2024.09.02-c5476b89

The functionality described in this section requires version 2024.09.02-c5476b89 of KumoMTA, or a more recent version.

Explicitly close the client object and associated connection.