kumo.jsonl.new_tailer
local tailer = kumo.jsonl.new_tailer {
directory = '/path/to/logs',
-- optional fields:
pattern = '*.zst',
max_batch_size = 100,
max_batch_latency = '1s',
checkpoint_name = 'my-consumer',
poll_watcher = '500ms',
tail = false,
}
-- optional filter as second argument:
local tailer2 = kumo.jsonl.new_tailer(
{ directory = '/var/log/kumomta' },
function(record)
return record.type == 'Delivery'
end
)
Since: Dev Builds Only
The functionality described in this section requires a dev build of KumoMTA. You can obtain a dev build by following the instructions in the Installation section.
Creates a single-consumer log tailer that reads zstd-compressed JSONL segment
files from directory and yields them as batches of parsed JSON values.
Progress can be checkpointed so that the tailer resumes from where it left off after a restart. An optional filter function can be supplied to discard unwanted records before they are added to a batch.
Configuration Parameters
directory
String. Required. The directory containing the segment files to read.
pattern
String (glob). Optional. A glob pattern to match segment files within
directory. Defaults to "*".
max_batch_size
Integer. Optional. Maximum number of records per batch. Defaults to 100.
max_batch_latency
Duration string (e.g., "500ms", "1s"). Optional. Maximum time to wait
for a partial batch to fill before yielding it. Defaults to "1s".
checkpoint_name
String. Optional. When set, enables checkpoint persistence. The checkpoint
is stored as a hidden file named .<checkpoint_name> inside directory. On
the next run, the tailer resumes from the checkpointed position rather than
re-reading from the beginning.
poll_watcher
Duration string (e.g., "500ms"). Optional. When set, uses a polling-based
filesystem watcher with the given interval instead of the platform's native
filesystem notification mechanism. Useful on network filesystems or container
environments where native watchers are unreliable.
tail
Boolean. Optional. When true, ignores any existing checkpoint and starts
reading from the most recent segment only, skipping all older segments.
Defaults to false.
Filter function
An optional second argument can be a function that receives each parsed record
as a lua value and returns true to include it in the batch or false to
discard it. If the function raises an error, the error is propagated to the
caller.
Methods
tailer:batches
Returns an iterator function that yields one LogBatch per
call, or nil when the stream is exhausted. Each call polls the underlying
stream for more data.
tailer:close
Signals the tailer to stop. Any pending or subsequent poll returns nil.
Note: :close() does not write a checkpoint. Call
batch:commit() on each batch after processing it
to advance the checkpoint.
Example
local kumo = require 'kumo'
local tailer = kumo.jsonl.new_tailer {
directory = '/var/log/kumomta',
pattern = '*.zst',
max_batch_size = 500,
max_batch_latency = '250ms',
checkpoint_name = 'delivery-processor',
}
for batch in tailer:batches() do
for record in batch:iter_records() do
if record.type == 'Delivery' then
print('delivered: ' .. record.id)
end
end
batch:commit()
end
tailer:close()
Batched Webhook Example
The following example shows how to use new_tailer to read log records from
disk and post them in batches to an HTTP endpoint, equivalent to the
batched webhook
approach but driven from the log files rather than an in-process hook.
Each batch is encoded as a JSON array and posted as the request body, for
example: [{"type": "Delivery", ...}, {"type": "Reception", ...}].
Save the script below as e.g. /path/to/webhook.lua and run it as a
standalone script:
local kumo = require 'kumo'
kumo.on('main', function()
local client = kumo.http.build_client {}
local max_retries = 5
local retry_delay_seconds = 2
-- Perform the HTTP POST and return the response.
local function post_batch(payload)
return client
:post('http://10.0.0.1:4242/log')
:header('Content-Type', 'application/json')
:body(payload)
:send()
end
-- Encode the records as a JSON array, then post with retries.
-- Returns only if the post succeeded; raises an error otherwise,
-- leaving the batch uncommitted so the next run will retry it.
local function process_batch(records)
local payload = kumo.serde.json_encode(records)
for attempt = 1, max_retries do
local response = post_batch(payload)
if response:status_is_success() then
return
end
local status = response:status_code()
local disposition = string.format(
'%d %s: %s',
status,
response:status_reason(),
response:text()
)
-- 429 Too Many Requests and 5xx server errors are
-- transient and worth retrying. 4xx client errors
-- (other than 429) indicate a permanent problem with
-- the request itself and are not worth retrying.
local retryable = status == 429 or status >= 500
if not retryable or attempt == max_retries then
-- We did not commit, so the next run will retry this batch.
-- Sleep briefly before failing out so that a supervising
-- process does not restart us too quickly.
kumo.log_error(
string.format(
'webhook post failed (attempt %d/%d), giving up: %s',
attempt,
max_retries,
disposition
)
)
kumo.time.sleep(5)
error('webhook post failed: ' .. disposition)
end
-- Transient failure: wait before retrying.
kumo.log_error(
string.format(
'webhook post failed (attempt %d/%d), retrying in %ds: %s',
attempt,
max_retries,
retry_delay_seconds,
disposition
)
)
kumo.time.sleep(retry_delay_seconds)
end
end
local tailer = kumo.jsonl.new_tailer(
{
directory = '/var/log/kumomta',
max_batch_size = 100,
max_batch_latency = '1s',
checkpoint_name = 'webhook-poster',
},
-- Only forward records for the 'customer-a' egress pool.
function(record)
return record.egress_pool == 'customer-a'
end
)
for batch in tailer:batches() do
process_batch(batch:records())
batch:commit()
end
tailer:close()
client:close()
end)
Per-Customer Webhook Example with main Parameters
The main event receives any extra command-line arguments passed after --
as parameters to the handler function. This makes it straightforward to run
one instance of the same script per customer, each operating independently
with its own checkpoint and filter:
$ /opt/kumomta/sbin/kumod --script --policy /path/to/webhook.lua -- customer-a
$ /opt/kumomta/sbin/kumod --script --policy /path/to/webhook.lua -- customer-b
$ /opt/kumomta/sbin/kumod --script --policy /path/to/webhook.lua -- customer-c
Each process reads only the records for its own pool, maintains its own
checkpoint, and retries failures independently without affecting any other
customer's delivery. This is the robust alternative to the multi-consumer
tailer approach described in
kumo.jsonl.new_multi_tailer.
local kumo = require 'kumo'
-- Map each customer (egress pool name) to their webhook endpoint.
local endpoints = {
['customer-a'] = 'http://customer-a.example.com/log',
['customer-b'] = 'http://customer-b.example.com/log',
['customer-c'] = 'http://customer-c.example.com/log',
}
kumo.on('main', function(pool_name)
local url = assert(
endpoints[pool_name],
string.format("unknown pool '%s'", pool_name)
)
local client = kumo.http.build_client {}
local max_retries = 5
local retry_delay_seconds = 2
local function post_batch(payload)
return client
:post(url)
:header('Content-Type', 'application/json')
:body(payload)
:send()
end
-- Returns only if the post succeeded; raises an error otherwise,
-- leaving the batch uncommitted so the next run will retry it.
local function process_batch(records)
local payload = kumo.serde.json_encode(records)
for attempt = 1, max_retries do
local response = post_batch(payload)
if response:status_is_success() then
return
end
local status = response:status_code()
local disposition = string.format(
'%d %s: %s',
status,
response:status_reason(),
response:text()
)
local retryable = status == 429 or status >= 500
if not retryable or attempt == max_retries then
kumo.log_error(
string.format(
'webhook post to %s failed (attempt %d/%d), giving up: %s',
url,
attempt,
max_retries,
disposition
)
)
kumo.time.sleep(5)
error('webhook post failed: ' .. disposition)
end
kumo.log_error(
string.format(
'webhook post to %s failed (attempt %d/%d), retrying in %ds: %s',
url,
attempt,
max_retries,
retry_delay_seconds,
disposition
)
)
kumo.time.sleep(retry_delay_seconds)
end
end
local tailer = kumo.jsonl.new_tailer(
{
directory = '/var/log/kumomta',
max_batch_size = 100,
max_batch_latency = '1s',
-- Each customer has its own independent checkpoint.
checkpoint_name = 'webhook-' .. pool_name,
},
-- Only process records belonging to this customer's egress pool.
function(record)
return record.egress_pool == pool_name
end
)
for batch in tailer:batches() do
process_batch(batch:records())
batch:commit()
end
tailer:close()
client:close()
end)