do compression better

Signed-off-by: Christine Dodrill <me@christine.website>
This commit is contained in:
Cadey Ratio 2021-09-06 17:59:01 -04:00
parent 487acfe5aa
commit 7707f8dbf5
2 changed files with 70 additions and 18 deletions
crates/logtail-poster

View File

@ -11,12 +11,10 @@ ring-channel = "0.9"
reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls" ] } reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls" ] }
serde = { version = "1", features = [ "derive" ] } serde = { version = "1", features = [ "derive" ] }
serde_json = "1" serde_json = "1"
tokio = { version = "1", features = [ "full" ] }
thiserror = "1" thiserror = "1"
url = "2" url = "2"
zstd = "0.9" zstd = "0.9"
# local deps # local deps
logtail = { path = "../logtail" } logtail = { path = "../logtail" }
[dev-dependencies]
tokio = { version = "1", features = [ "full" ] }

View File

@ -1,16 +1,22 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
/// This facilitates writing logs to a logtail server. This is a port of /**
/// [github.com/tailscale/tailscale/logtail](https://github.com/tailscale/tailscale/blob/main/logtail/logtail.go)'s This facilitates writing logs to a logtail server. This is a port of
/// `logtail.go`. [github.com/tailscale/tailscale/logtail](https://github.com/tailscale/tailscale/blob/main/logtail/logtail.go)'s
`logtail.go`.
*/
use reqwest::Client; use reqwest::Client;
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
/// DefaultHost is the default URL to upload logs to when Builder.base_url isn't provided. /// DefaultHost is the default URL to upload logs to when Builder.base_url isn't provided.
const DEFAULT_HOST: &'static str = "https://log.tailscale.io"; const DEFAULT_HOST: &'static str = "https://log.tailscale.io";
/**
Builds a send/recv pair for the logtail service. Create a new Builder with the [Builder::default]
method. The only mandatory field is the `collection`.
*/
#[derive(Default)] #[derive(Default)]
pub struct Builder { pub struct Builder {
collection: String, collection: Option<String>,
private_id: Option<logtail::PrivateID>, private_id: Option<logtail::PrivateID>,
user_agent: Option<String>, user_agent: Option<String>,
base_url: Option<String>, base_url: Option<String>,
@ -19,41 +25,67 @@ pub struct Builder {
} }
impl Builder { impl Builder {
pub fn make() -> Self { /// The logtail collection to register logs to. This **MUST** be a hostname, even though
Builder::default() /// it is not used as a hostname. This is used to disambiguate multiple different programs
} /// from eachother.
pub fn collection(mut self, collection: String) -> Self { pub fn collection(mut self, collection: String) -> Self {
self.collection = collection; self.collection = Some(collection);
self self
} }
/// The private ID for this logtail identity. If one is not set then one will be auto-generated
/// but not saved. Users should store their local [logtail::PrivateID] on disk for later use.
pub fn private_id(mut self, id: logtail::PrivateID) -> Self { pub fn private_id(mut self, id: logtail::PrivateID) -> Self {
self.private_id = Some(id); self.private_id = Some(id);
self self
} }
/// The user agent to attribute logs to. If not set, one will not be sent.
pub fn user_agent(mut self, ua: String) -> Self { pub fn user_agent(mut self, ua: String) -> Self {
self.user_agent = Some(ua); self.user_agent = Some(ua);
self self
} }
/// The base logcatcher URL. If not set, this will default to [DEFAULT_HOST].
pub fn base_url(mut self, base_url: String) -> Self { pub fn base_url(mut self, base_url: String) -> Self {
self.base_url = Some(base_url); self.base_url = Some(base_url);
self self
} }
/// A custom [reqwest::Client] to use for all interactions. If set this makes
/// [Builder::user_agent] calls ineffectual.
pub fn client(mut self, client: Client) -> Self { pub fn client(mut self, client: Client) -> Self {
self.client = Some(client); self.client = Some(client);
self self
} }
/// The number of log messages to buffer in memory. By default this is set to
/// 256 messages buffered until new ones are dropped.
pub fn buffer_size(mut self, buffer_size: usize) -> Self {
self.buffer_size = buffer_size;
self
}
/// A "low-memory" friendly value for the buffer size. This will only queue up to
/// 64 messages until new ones are dropped.
pub fn low_mem(self) -> Self {
self.buffer_size(64)
}
/// Trades the Builder in for an Ingress/Egress pair. Ingress will be safe to `clone`
/// as many times as you need to. Egress must have [Egress::post] called periodically
/// in order for log messages to get sent to the server.
pub fn build(self) -> Result<(Ingress, Egress), Error> { pub fn build(self) -> Result<(Ingress, Egress), Error> {
let buf_size: usize = if self.buffer_size != 0 { let buf_size: usize = if self.buffer_size != 0 {
self.buffer_size self.buffer_size
} else { } else {
256 256
}; };
if let None = self.collection {
return Err(Error::NoCollection);
}
let (tx, rx) = ring_channel::ring_channel(NonZeroUsize::new(buf_size).unwrap()); let (tx, rx) = ring_channel::ring_channel(NonZeroUsize::new(buf_size).unwrap());
let private_id = self.private_id.unwrap_or(logtail::PrivateID::new()); let private_id = self.private_id.unwrap_or(logtail::PrivateID::new());
let base_url = self.base_url.unwrap_or(DEFAULT_HOST.to_string()); let base_url = self.base_url.unwrap_or(DEFAULT_HOST.to_string());
@ -61,7 +93,7 @@ impl Builder {
u.path_segments_mut() u.path_segments_mut()
.unwrap() .unwrap()
.push("c") .push("c")
.push(&self.collection) .push(&self.collection.unwrap())
.push(&private_id.as_hex()); .push(&private_id.as_hex());
let ing = Ingress { tx }; let ing = Ingress { tx };
@ -85,6 +117,9 @@ impl Builder {
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum Error { pub enum Error {
#[error("no collection defined")]
NoCollection,
#[error("can't put to in-memory buffer")] #[error("can't put to in-memory buffer")]
TXFail, TXFail,
@ -105,13 +140,20 @@ pub enum Error {
#[error("must be json object")] #[error("must be json object")]
MustBeJsonObject, MustBeJsonObject,
#[error("can't do compression task: {0}")]
JoinError(#[from] tokio::task::JoinError),
} }
/// The sink you dump log messages to. You can clone this as many times as you
/// need to.
#[derive(Clone)]
pub struct Ingress { pub struct Ingress {
tx: ring_channel::RingSender<serde_json::Value>, tx: ring_channel::RingSender<serde_json::Value>,
} }
impl Ingress { impl Ingress {
/// Sends a JSON object to the log server. This MUST be a JSON object.
pub fn send(&mut self, val: serde_json::Value) -> Result<(), Error> { pub fn send(&mut self, val: serde_json::Value) -> Result<(), Error> {
if !val.is_object() { if !val.is_object() {
return Err(Error::MustBeJsonObject); return Err(Error::MustBeJsonObject);
@ -133,10 +175,11 @@ impl Ingress {
} }
#[derive(Clone, serde::Serialize)] #[derive(Clone, serde::Serialize)]
pub struct LogtailHeader { struct LogtailHeader {
pub client_time: DateTime<Utc>, pub client_time: DateTime<Utc>,
} }
/// The egressor of log messages buffered by its matching [Ingress].
pub struct Egress { pub struct Egress {
url: String, url: String,
client: reqwest::Client, client: reqwest::Client,
@ -145,7 +188,7 @@ pub struct Egress {
impl Egress { impl Egress {
fn pull(&mut self) -> Result<Vec<serde_json::Value>, Error> { fn pull(&mut self) -> Result<Vec<serde_json::Value>, Error> {
let mut values: Vec<serde_json::Value> = vec![]; // self.rx.collect::<Vec<serde_json::Value>>().await; let mut values: Vec<serde_json::Value> = vec![];
loop { loop {
match self.rx.try_recv() { match self.rx.try_recv() {
@ -163,11 +206,21 @@ impl Egress {
Ok(values) Ok(values)
} }
/// Pushes log messages to logtail. This will push everything buffered into the
/// log server. This should be called periodically.
pub async fn post(&mut self) -> Result<(), Error> { pub async fn post(&mut self) -> Result<(), Error> {
let values = self.pull()?; let values = self.pull()?;
self.push(values).await?;
Ok(())
}
async fn push(&self, values: Vec<serde_json::Value>) -> Result<(), Error> {
let bytes = serde_json::to_vec(&values)?; let bytes = serde_json::to_vec(&values)?;
let orig_len = bytes.len(); let orig_len = bytes.len();
let compressed = zstd::block::compress(&bytes, 5).map_err(|_| Error::ZstdError)?; let compressed = tokio::task::spawn_blocking(move || {
zstd::block::compress(&bytes, 5).map_err(|_| Error::ZstdError)
})
.await??;
let resp = self let resp = self
.client .client
@ -195,9 +248,10 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn logpoke() { async fn end_to_end() {
let (mut ing, mut eg) = Builder::make() let (mut ing, mut eg) = Builder::default()
.collection("rebterlai.logtail-poster.test".to_string()) .collection("rebterlai.logtail-poster.test".to_string())
.user_agent("rebterlai/test".to_string())
.base_url("http://127.0.0.1:48283".to_string()) .base_url("http://127.0.0.1:48283".to_string())
.build() .build()
.unwrap(); .unwrap();