rebterlai/crates/logtail-poster/src/lib.rs

216 lines
5.5 KiB
Rust

use chrono::{DateTime, Utc};
/// This facilitates writing logs to a logtail server. This is a port of
/// [github.com/tailscale/tailscale/logtail](https://github.com/tailscale/tailscale/blob/main/logtail/logtail.go)'s
/// `logtail.go`.
use reqwest::Client;
use std::num::NonZeroUsize;
/// DefaultHost is the default URL to upload logs to when Builder.base_url isn't provided.
const DEFAULT_HOST: &'static str = "https://log.tailscale.io";
#[derive(Default)]
pub struct Builder {
collection: String,
private_id: Option<logtail::PrivateID>,
user_agent: Option<String>,
base_url: Option<String>,
client: Option<reqwest::Client>,
buffer_size: usize,
}
impl Builder {
pub fn make() -> Self {
Builder::default()
}
pub fn collection(mut self, collection: String) -> Self {
self.collection = collection;
self
}
pub fn private_id(mut self, id: logtail::PrivateID) -> Self {
self.private_id = Some(id);
self
}
pub fn user_agent(mut self, ua: String) -> Self {
self.user_agent = Some(ua);
self
}
pub fn base_url(mut self, base_url: String) -> Self {
self.base_url = Some(base_url);
self
}
pub fn client(mut self, client: Client) -> Self {
self.client = Some(client);
self
}
pub fn build(self) -> Result<(Ingress, Egress), Error> {
let buf_size: usize = if self.buffer_size != 0 {
self.buffer_size
} else {
256
};
let (tx, rx) = ring_channel::ring_channel(NonZeroUsize::new(buf_size).unwrap());
let private_id = self.private_id.unwrap_or(logtail::PrivateID::new());
let base_url = self.base_url.unwrap_or(DEFAULT_HOST.to_string());
let mut u = url::Url::parse(&base_url)?;
u.path_segments_mut()
.unwrap()
.push("c")
.push(&self.collection)
.push(&private_id.as_hex());
let ing = Ingress { tx };
let eg = Egress {
url: u.as_str().to_string(),
client: self.client.unwrap_or({
let mut builder = Client::builder();
if let Some(ua) = self.user_agent {
builder = builder.user_agent(ua);
}
builder.build().unwrap()
}),
rx,
};
Ok((ing, eg))
}
}
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("can't put to in-memory buffer")]
TXFail,
#[error("can't get from in-memory buffer: {0}")]
RXFail(#[from] ring_channel::TryRecvError),
#[error("can't parse a URL: {0}")]
URLParseError(#[from] url::ParseError),
#[error("can't post logs: {0}")]
ReqwestError(#[from] reqwest::Error),
#[error("can't encode to json: {0}")]
JsonError(#[from] serde_json::Error),
#[error("can't compress")]
ZstdError,
#[error("must be json object")]
MustBeJsonObject,
}
pub struct Ingress {
tx: ring_channel::RingSender<serde_json::Value>,
}
impl Ingress {
pub fn send(&mut self, val: serde_json::Value) -> Result<(), Error> {
if !val.is_object() {
return Err(Error::MustBeJsonObject);
}
let mut val = val.clone();
let header = LogtailHeader {
client_time: Utc::now(),
};
let obj = val.as_object_mut().unwrap();
obj.insert("logtail".to_string(), serde_json::to_value(header)?);
match self.tx.send(val) {
Ok(_) => Ok(()),
Err(_) => Err(Error::TXFail),
}
}
}
#[derive(Clone, serde::Serialize)]
pub struct LogtailHeader {
pub client_time: DateTime<Utc>,
}
pub struct Egress {
url: String,
client: reqwest::Client,
rx: ring_channel::RingReceiver<serde_json::Value>,
}
impl Egress {
fn pull(&mut self) -> Result<Vec<serde_json::Value>, Error> {
let mut values: Vec<serde_json::Value> = vec![]; // self.rx.collect::<Vec<serde_json::Value>>().await;
loop {
match self.rx.try_recv() {
Ok(val) => values.push(val),
Err(why) => {
use ring_channel::TryRecvError::*;
match why {
Empty => break,
Disconnected => return Err(Error::RXFail(why)),
};
}
};
}
Ok(values)
}
pub async fn post(&mut self) -> Result<(), Error> {
let values = self.pull()?;
let bytes = serde_json::to_vec(&values)?;
let orig_len = bytes.len();
let compressed = zstd::block::compress(&bytes, 5).map_err(|_| Error::ZstdError)?;
let resp = self
.client
.post(&self.url)
.header("Content-Encoding", "zstd")
.header("Orig-Content-Length", orig_len)
.body(compressed)
.timeout(std::time::Duration::from_secs(1))
.send()
.await?;
resp.error_for_status()?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::Builder;
#[derive(Clone, serde::Serialize)]
struct Data {
pub foo: String,
}
#[tokio::test]
async fn logpoke() {
let (mut ing, mut eg) = Builder::make()
.collection("rebterlai.logtail-poster.test".to_string())
.base_url("http://127.0.0.1:48283".to_string())
.build()
.unwrap();
ing.send(
serde_json::to_value(Data {
foo: "bar".to_string(),
})
.unwrap(),
)
.unwrap();
eg.post().await.unwrap();
}
}