kumo.jsonl.new_multi_tailer
local tailer = kumo.jsonl.new_multi_tailer {
directory = '/path/to/logs',
consumers = {
{
name = 'deliveries',
checkpoint_name = 'cp-deliveries',
max_batch_size = 100,
max_batch_latency = '1s',
filter = function(record)
return record.type == 'Delivery'
end,
},
{
name = 'bounces',
checkpoint_name = 'cp-bounces',
max_batch_latency = '5s',
filter = function(record)
return record.type == 'Bounce'
end,
},
},
}
Since: Dev Builds Only
The functionality described in this section requires a dev build of KumoMTA. You can obtain a dev build by following the instructions in the Installation section.
Creates a multi-consumer log tailer that reads each segment file exactly once and distributes records to multiple independent consumers. Each consumer has its own filter, batch parameters, and optional checkpoint.
This is more efficient than running multiple single-consumer tailers over the same directory, because decompression and JSON parsing happen once per record regardless of how many consumers are configured.
Top-level Configuration Parameters
directory
String. Required. The directory containing the segment files to read.
pattern
String (glob). Optional. A glob pattern to match segment files within
directory. Defaults to "*".
poll_watcher
Duration string (e.g., "500ms"). Optional. When set, uses a polling-based
filesystem watcher with the given interval instead of the platform's native
filesystem notification mechanism.
tail
Boolean. Optional. When true, ignores any existing checkpoints and starts
reading from the most recent segment only. Defaults to false.
consumers
Array of consumer config tables. Required. Each entry defines one consumer. See Consumer Configuration below.
Consumer Configuration
Each entry in the consumers array is a table with the following fields.
name
String. Required. A unique name for this consumer. Returned by
batch:consumer_name() so that the caller
can route batches correctly.
max_batch_size
Integer. Optional. Maximum number of records per batch for this consumer.
Defaults to 100.
max_batch_latency
Duration string. Optional. Maximum time to wait for a partial batch to fill
before yielding it. Defaults to "1s".
Consumers with different latencies will have their batches yielded independently: a consumer with a short latency does not delay a consumer with a longer one.
checkpoint_name
String. Optional. When set, enables checkpoint persistence for this consumer
stored as a hidden file .<checkpoint_name> inside directory. Each consumer
has its own independent checkpoint.
filter
Function. Optional. A function that receives each parsed record as a lua
value and returns true to include it in this consumer's batch or false to
discard it. If the function raises an error, the error is propagated to the
caller.
Methods
tailer:batches
for batches in tailer:batches() do
for _, batch in ipairs(batches) do
print(batch:consumer_name())
end
end
Returns an iterator that yields a lua table (sequence) of
LogBatch objects on each call — one entry per consumer whose
batch is ready — or nil when the stream is exhausted.
A batch is ready when it is full (max_batch_size reached) or its
max_batch_latency has expired. Consumers are yielded independently: a slow
consumer does not prevent a fast consumer's batch from being delivered.
tailer:close
Signals the tailer to stop. Any pending or subsequent poll returns nil.
Note: :close() does not write a checkpoint. Call
batch:commit() on each batch after processing it
to advance that consumer's checkpoint.
Example
local kumo = require 'kumo'
local tailer = kumo.jsonl.new_multi_tailer {
directory = '/var/log/kumomta',
pattern = '*.zst',
consumers = {
{
name = 'deliveries',
checkpoint_name = 'cp-del',
max_batch_size = 200,
max_batch_latency = '500ms',
filter = function(record)
return record.type == 'Delivery'
end,
},
{
name = 'bounces',
checkpoint_name = 'cp-bounce',
max_batch_latency = '5s',
filter = function(record)
return record.type == 'Bounce' or record.type == 'TransientFailure'
end,
},
},
}
for batches in tailer:batches() do
for _, batch in ipairs(batches) do
local name = batch:consumer_name()
for record in batch:iter_records() do
print(name .. ': ' .. record.id)
end
batch:commit()
end
end
tailer:close()
Per-Customer Batched Webhook Example
This example models a multi-tenant scenario where each customer has their own
egress pool and their own HTTP endpoint to receive log events. A consumer is
configured for each customer, filtering on the egress_pool field of the
record. Each consumer has its own checkpoint, so progress is tracked
independently per customer, however, there is a tradeoff regarding
error handling if just one of those consumers has a persistent issue;
see the comments in the example below.
If you need fully independent per-customer retry behaviour with no risk of
dropping records, consider the alternative approach shown in
Per-Customer Webhook Example with main Parameters,
which runs one kumo.jsonl.new_tailer process per customer.
Save the script as e.g. /path/to/webhook.lua and run it as a standalone
script:
local kumo = require 'kumo'
kumo.on('main', function()
-- Map each customer (egress pool name) to their webhook endpoint.
local endpoints = {
['customer-a'] = 'http://customer-a.example.com/log',
['customer-b'] = 'http://customer-b.example.com/log',
['customer-c'] = 'http://customer-c.example.com/log',
}
local client = kumo.http.build_client {}
local max_retries = 5
local retry_delay_seconds = 2
-- Perform the HTTP POST to the given url and return the response.
local function post_batch(url, payload)
return client
:post(url)
:header('Content-Type', 'application/json')
:body(payload)
:send()
end
-- Encode the records as a JSON array, then post to url with retries.
-- On unrecoverable failure, logs the error and commits anyway so that
-- the other consumers can continue to make progress; this means the
-- batch is dropped with no retry for this customer.
--
-- This is an inherent trade-off with the multi-consumer tailer: all
-- consumers share a single read position, so a failure on one cannot
-- block the others without stalling all consumers.
--
-- If you need robust per-consumer delivery with independent retry
-- behaviour that never drops records and never blocks other consumers,
-- run a separate kumo.jsonl.new_tailer (single-consumer) instance per
-- customer instead.
local function process_batch(url, records)
local payload = kumo.serde.json_encode(records)
for attempt = 1, max_retries do
local response = post_batch(url, payload)
if response:status_is_success() then
return
end
local status = response:status_code()
local disposition = string.format(
'%d %s: %s',
status,
response:status_reason(),
response:text()
)
-- 429 Too Many Requests and 5xx server errors are transient
-- and worth retrying. Other 4xx errors are not.
local retryable = status == 429 or status >= 500
if not retryable or attempt == max_retries then
-- Drop this batch and commit so other consumers are not stalled.
kumo.log_error(
string.format(
'webhook post to %s failed (attempt %d/%d), dropping batch: %s',
url,
attempt,
max_retries,
disposition
)
)
return
end
-- Transient failure: wait before retrying.
kumo.log_error(
string.format(
'webhook post to %s failed (attempt %d/%d), retrying in %ds: %s',
url,
attempt,
max_retries,
retry_delay_seconds,
disposition
)
)
kumo.time.sleep(retry_delay_seconds)
end
end
-- Build one consumer per customer, filtering on egress_pool.
local consumers = {}
for name, _ in pairs(endpoints) do
local pool = name
table.insert(consumers, {
name = pool,
checkpoint_name = 'webhook-' .. pool,
max_batch_size = 100,
max_batch_latency = '1s',
filter = function(record)
return record.egress_pool == pool
end,
})
end
local tailer = kumo.jsonl.new_multi_tailer {
directory = '/var/log/kumomta',
consumers = consumers,
}
for batches in tailer:batches() do
for _, batch in ipairs(batches) do
local url = endpoints[batch:consumer_name()]
process_batch(url, batch:records())
batch:commit()
end
end
tailer:close()
client:close()
end)