From 7707f8dbf5e00d3543e561fce08eca32296efcc0 Mon Sep 17 00:00:00 2001 From: Christine Dodrill Date: Mon, 6 Sep 2021 17:59:01 -0400 Subject: [PATCH] do compression better Signed-off-by: Christine Dodrill --- crates/logtail-poster/Cargo.toml | 4 +- crates/logtail-poster/src/lib.rs | 84 ++++++++++++++++++++++++++------ 2 files changed, 70 insertions(+), 18 deletions(-) diff --git a/crates/logtail-poster/Cargo.toml b/crates/logtail-poster/Cargo.toml index 956ac71..8debb88 100644 --- a/crates/logtail-poster/Cargo.toml +++ b/crates/logtail-poster/Cargo.toml @@ -11,12 +11,10 @@ ring-channel = "0.9" reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls" ] } serde = { version = "1", features = [ "derive" ] } serde_json = "1" +tokio = { version = "1", features = [ "full" ] } thiserror = "1" url = "2" zstd = "0.9" # local deps logtail = { path = "../logtail" } - -[dev-dependencies] -tokio = { version = "1", features = [ "full" ] } diff --git a/crates/logtail-poster/src/lib.rs b/crates/logtail-poster/src/lib.rs index e69aa07..728d4cc 100644 --- a/crates/logtail-poster/src/lib.rs +++ b/crates/logtail-poster/src/lib.rs @@ -1,16 +1,22 @@ use chrono::{DateTime, Utc}; -/// This facilitates writing logs to a logtail server. This is a port of -/// [github.com/tailscale/tailscale/logtail](https://github.com/tailscale/tailscale/blob/main/logtail/logtail.go)'s -/// `logtail.go`. +/** +This facilitates writing logs to a logtail server. This is a port of +[github.com/tailscale/tailscale/logtail](https://github.com/tailscale/tailscale/blob/main/logtail/logtail.go)'s +`logtail.go`. +*/ use reqwest::Client; use std::num::NonZeroUsize; /// DefaultHost is the default URL to upload logs to when Builder.base_url isn't provided. const DEFAULT_HOST: &'static str = "https://log.tailscale.io"; +/** +Builds a send/recv pair for the logtail service. Create a new Builder with the [Builder::default] +method. The only mandatory field is the `collection`. +*/ #[derive(Default)] pub struct Builder { - collection: String, + collection: Option, private_id: Option, user_agent: Option, base_url: Option, @@ -19,41 +25,67 @@ pub struct Builder { } impl Builder { - pub fn make() -> Self { - Builder::default() - } - + /// The logtail collection to register logs to. This **MUST** be a hostname, even though + /// it is not used as a hostname. This is used to disambiguate multiple different programs + /// from eachother. pub fn collection(mut self, collection: String) -> Self { - self.collection = collection; + self.collection = Some(collection); self } + /// The private ID for this logtail identity. If one is not set then one will be auto-generated + /// but not saved. Users should store their local [logtail::PrivateID] on disk for later use. pub fn private_id(mut self, id: logtail::PrivateID) -> Self { self.private_id = Some(id); self } + /// The user agent to attribute logs to. If not set, one will not be sent. pub fn user_agent(mut self, ua: String) -> Self { self.user_agent = Some(ua); self } + /// The base logcatcher URL. If not set, this will default to [DEFAULT_HOST]. pub fn base_url(mut self, base_url: String) -> Self { self.base_url = Some(base_url); self } + /// A custom [reqwest::Client] to use for all interactions. If set this makes + /// [Builder::user_agent] calls ineffectual. pub fn client(mut self, client: Client) -> Self { self.client = Some(client); self } + /// The number of log messages to buffer in memory. By default this is set to + /// 256 messages buffered until new ones are dropped. + pub fn buffer_size(mut self, buffer_size: usize) -> Self { + self.buffer_size = buffer_size; + self + } + + /// A "low-memory" friendly value for the buffer size. This will only queue up to + /// 64 messages until new ones are dropped. + pub fn low_mem(self) -> Self { + self.buffer_size(64) + } + + /// Trades the Builder in for an Ingress/Egress pair. Ingress will be safe to `clone` + /// as many times as you need to. Egress must have [Egress::post] called periodically + /// in order for log messages to get sent to the server. pub fn build(self) -> Result<(Ingress, Egress), Error> { let buf_size: usize = if self.buffer_size != 0 { self.buffer_size } else { 256 }; + + if let None = self.collection { + return Err(Error::NoCollection); + } + let (tx, rx) = ring_channel::ring_channel(NonZeroUsize::new(buf_size).unwrap()); let private_id = self.private_id.unwrap_or(logtail::PrivateID::new()); let base_url = self.base_url.unwrap_or(DEFAULT_HOST.to_string()); @@ -61,7 +93,7 @@ impl Builder { u.path_segments_mut() .unwrap() .push("c") - .push(&self.collection) + .push(&self.collection.unwrap()) .push(&private_id.as_hex()); let ing = Ingress { tx }; @@ -85,6 +117,9 @@ impl Builder { #[derive(thiserror::Error, Debug)] pub enum Error { + #[error("no collection defined")] + NoCollection, + #[error("can't put to in-memory buffer")] TXFail, @@ -105,13 +140,20 @@ pub enum Error { #[error("must be json object")] MustBeJsonObject, + + #[error("can't do compression task: {0}")] + JoinError(#[from] tokio::task::JoinError), } +/// The sink you dump log messages to. You can clone this as many times as you +/// need to. +#[derive(Clone)] pub struct Ingress { tx: ring_channel::RingSender, } impl Ingress { + /// Sends a JSON object to the log server. This MUST be a JSON object. pub fn send(&mut self, val: serde_json::Value) -> Result<(), Error> { if !val.is_object() { return Err(Error::MustBeJsonObject); @@ -133,10 +175,11 @@ impl Ingress { } #[derive(Clone, serde::Serialize)] -pub struct LogtailHeader { +struct LogtailHeader { pub client_time: DateTime, } +/// The egressor of log messages buffered by its matching [Ingress]. pub struct Egress { url: String, client: reqwest::Client, @@ -145,7 +188,7 @@ pub struct Egress { impl Egress { fn pull(&mut self) -> Result, Error> { - let mut values: Vec = vec![]; // self.rx.collect::>().await; + let mut values: Vec = vec![]; loop { match self.rx.try_recv() { @@ -163,11 +206,21 @@ impl Egress { Ok(values) } + /// Pushes log messages to logtail. This will push everything buffered into the + /// log server. This should be called periodically. pub async fn post(&mut self) -> Result<(), Error> { let values = self.pull()?; + self.push(values).await?; + Ok(()) + } + + async fn push(&self, values: Vec) -> Result<(), Error> { let bytes = serde_json::to_vec(&values)?; let orig_len = bytes.len(); - let compressed = zstd::block::compress(&bytes, 5).map_err(|_| Error::ZstdError)?; + let compressed = tokio::task::spawn_blocking(move || { + zstd::block::compress(&bytes, 5).map_err(|_| Error::ZstdError) + }) + .await??; let resp = self .client @@ -195,9 +248,10 @@ mod tests { } #[tokio::test] - async fn logpoke() { - let (mut ing, mut eg) = Builder::make() + async fn end_to_end() { + let (mut ing, mut eg) = Builder::default() .collection("rebterlai.logtail-poster.test".to_string()) + .user_agent("rebterlai/test".to_string()) .base_url("http://127.0.0.1:48283".to_string()) .build() .unwrap();