1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use config::{any_err, get_or_create_sub_module};
use mlua::prelude::LuaUserData;
use mlua::{Lua, LuaSerdeExt, UserDataMethods, Value};
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
use serde::Deserialize;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;

#[derive(Clone)]
struct Producer {
    producer: Arc<Mutex<Option<Arc<FutureProducer>>>>,
}

impl Producer {
    fn get_producer(&self) -> mlua::Result<Arc<FutureProducer>> {
        self.producer
            .lock()
            .unwrap()
            .as_ref()
            .map(Arc::clone)
            .ok_or_else(|| mlua::Error::external("client was closed"))
    }
}

#[derive(Deserialize, Debug)]
struct Record {
    /// Required destination topic
    topic: String,
    /// Optional destination partition
    #[serde(default)]
    partition: Option<i32>,
    /// Optional payload
    #[serde(default)]
    payload: Option<String>,
    /// Optional key
    #[serde(default)]
    key: Option<String>,

    /// Optional headers
    #[serde(default)]
    headers: HashMap<String, String>,

    /// Optional timeout. If no timeout is provided, assume 1 minute.
    /// The timeout is how long to keep retrying to submit to kafka
    /// before giving up on this attempt.
    /// Note that the underlying library supports retrying forever,
    /// but in kumomta we don't allow that; we can retry later without
    /// keeping the system occupied for an indefinite time.
    #[serde(default)]
    #[serde(with = "duration_serde")]
    timeout: Option<Duration>,
}

impl LuaUserData for Producer {
    fn add_methods<'lua, M: UserDataMethods<'lua, Self>>(methods: &mut M) {
        methods.add_async_method("send", |lua, this, value: Value| async move {
            let record: Record = lua.from_value(value)?;

            let headers = if record.headers.is_empty() {
                None
            } else {
                let mut headers = OwnedHeaders::new();
                for (key, v) in &record.headers {
                    headers = headers.insert(Header {
                        key,
                        value: Some(v),
                    });
                }
                Some(headers)
            };

            let future_record = FutureRecord {
                topic: &record.topic,
                partition: record.partition,
                payload: record.payload.as_ref(),
                key: record.key.as_ref(),
                headers,
                timestamp: None,
            };

            let (partition, offset) = this
                .get_producer()?
                .send(
                    future_record,
                    Timeout::After(record.timeout.unwrap_or(Duration::from_secs(60))),
                )
                .await
                .map_err(|(code, _msg)| any_err(code))?;

            Ok((partition, offset))
        });

        methods.add_method("close", |_lua, this, _: ()| {
            this.producer.lock().unwrap().take();
            Ok(())
        });
    }
}

pub fn register(lua: &Lua) -> anyhow::Result<()> {
    let kafka_mod = get_or_create_sub_module(lua, "kafka")?;

    kafka_mod.set(
        "build_producer",
        lua.create_async_function(|_, config: HashMap<String, String>| async move {
            let mut builder = ClientConfig::new();
            for (k, v) in config {
                builder.set(k, v);
            }

            let producer = builder.create().map_err(any_err)?;

            Ok(Producer {
                producer: Arc::new(Mutex::new(Some(Arc::new(producer)))),
            })
        })?,
    )?;

    Ok(())
}