diff --git a/Cargo.lock b/Cargo.lock index e5bb373..2464ef3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -724,6 +724,7 @@ dependencies = [ "structopt", "tokio", "tower", + "tower-http", "tracing", "tracing-subscriber", "zstd", @@ -1612,6 +1613,7 @@ dependencies = [ "pin-project", "tower-layer", "tower-service", + "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 1d2a93e..3cef85d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,6 @@ [workspace] members = [ "cmd/*", "crates/*", "examples/*" ] + +[profile.release] +opt-level = 3 +lto = true diff --git a/cmd/logcatcher/Cargo.toml b/cmd/logcatcher/Cargo.toml index 68cd6a1..c2f8eae 100644 --- a/cmd/logcatcher/Cargo.toml +++ b/cmd/logcatcher/Cargo.toml @@ -43,3 +43,7 @@ features = [ "derive" ] [dependencies.tokio] version = "1" features = [ "full" ] + +[dependencies.tower-http] +version = "0.1" +features = [ "trace" ] diff --git a/cmd/logcatcher/src/main.rs b/cmd/logcatcher/src/main.rs index e127b7a..787007f 100644 --- a/cmd/logcatcher/src/main.rs +++ b/cmd/logcatcher/src/main.rs @@ -6,10 +6,7 @@ extern crate prometheus; extern crate tracing; use anyhow::Result; -use std::{ - net::{IpAddr, SocketAddr}, - path::PathBuf, -}; +use std::net::{IpAddr, SocketAddr}; use structopt::StructOpt; mod server; @@ -18,10 +15,6 @@ use server::*; #[derive(Debug, StructOpt)] #[structopt(name = "logcatcher", about = "The log catcher service")] pub(crate) struct Config { - /// The maximum number of bytes that can be in-ram at once. - #[structopt(short, long, default_value = "1048576000")] - pub(crate) max_bytes: usize, - /// The TCP host to listen on for HTTP traffic. #[structopt(short, long, default_value = "127.0.0.1:3848")] pub(crate) http_host: SocketAddr, @@ -33,10 +26,9 @@ pub(crate) struct Config { /// The IP address/es to expect proxied requests from. #[structopt(short, long)] pub(crate) proxy_ips: Option>, - - /// The place to store logs to disk. - #[structopt(short, long, default_value = "./var/log")] - pub(crate) log_dir: PathBuf, + // /// The place to store logs to disk. + // #[structopt(short, long, default_value = "./var/log")] + // pub(crate) log_dir: PathBuf, } #[tokio::main] diff --git a/cmd/logcatcher/src/server/mod.rs b/cmd/logcatcher/src/server/mod.rs index 6eaef8d..9530745 100644 --- a/cmd/logcatcher/src/server/mod.rs +++ b/cmd/logcatcher/src/server/mod.rs @@ -8,11 +8,19 @@ use chrono::prelude::*; use headers::{Header, HeaderName, HeaderValue}; use hyper::http::StatusCode; use logtail::PublicID; -use prometheus::{Encoder, IntGauge}; -use std::{convert::TryInto, net::SocketAddr, path::PathBuf}; +use prometheus::{Encoder, IntCounterVec, IntGauge}; +use std::{convert::TryInto, net::SocketAddr}; use tokio::sync::Semaphore; +use tower_http::trace::TraceLayer; + +const BYTE_MAX: usize = 1048576000; lazy_static! { + static ref LINES_PER_COLLECTION: IntCounterVec = register_int_counter_vec!( + opts!("lines_per_collection", "log lines per log collection"), + &["collection"] + ) + .unwrap(); static ref BYTES_IN_USE: IntGauge = register_int_gauge!( "buffer_bytes_in_use", "Number of bytes currently in use by zstd buffers" @@ -23,24 +31,22 @@ lazy_static! { "Maximum number of bytes that can be in use by zstd buffers" ) .unwrap(); - static ref BUF_SEM: Semaphore = Semaphore::new(1048576000); + static ref BUF_SEM: Semaphore = Semaphore::new(BYTE_MAX); static ref ORIG_CONTENT_LENGTH: HeaderName = axum::http::header::HeaderName::from_static("orig-content-length"); } pub struct Server { // config values - log_dir: PathBuf, http_host: SocketAddr, debug_host: SocketAddr, } impl Server { pub(crate) fn new(cfg: super::Config) -> Result { - MAX_BUFFER_SIZE.set(cfg.max_bytes.try_into()?); + MAX_BUFFER_SIZE.set(BYTE_MAX as i64); Ok(Self { - log_dir: cfg.log_dir, http_host: cfg.http_host, debug_host: cfg.debug_host, }) @@ -49,7 +55,9 @@ impl Server { pub(crate) async fn run(&self) -> Result<()> { tokio::spawn(metrics(self.debug_host.clone())); - let router = Router::new().route("/c/:collection/:private_id", post(put_logs)); + let router = Router::new() + .route("/c/:collection/:private_id", post(put_logs)) + .layer(TraceLayer::new_for_http()); hyper::Server::bind(&self.http_host) .serve(router.into_make_service()) @@ -102,7 +110,7 @@ async fn put_logs( error!("error acquiring buffer: {}", why); return StatusCode::INSUFFICIENT_STORAGE; } - BYTES_IN_USE.set(BUF_SEM.available_permits().try_into().unwrap()); + BYTES_IN_USE.set(BUF_SEM.available_permits() as i64 - BYTE_MAX as i64); let mut decompressed_body: Vec = Vec::new(); decompressed_body.resize(orig_size.0 as usize, 0); @@ -131,6 +139,10 @@ async fn put_logs( }, ) .unwrap(); + println!(""); + LINES_PER_COLLECTION + .with_label_values(&[collection.clone().as_str()]) + .inc(); } StatusCode::NO_CONTENT diff --git a/crates/logtail-poster/src/lib.rs b/crates/logtail-poster/src/lib.rs index 7ccb946..2d9e11a 100644 --- a/crates/logtail-poster/src/lib.rs +++ b/crates/logtail-poster/src/lib.rs @@ -189,13 +189,13 @@ pub struct Egress { } impl Egress { - fn pull(&mut self) -> Vec { + fn pull(&self) -> Vec { let mut values: Vec = vec![]; loop { match self.rx.try_recv() { Ok(val) => values.push(val), - Err(why) => { + Err(_) => { break; } }; @@ -206,7 +206,7 @@ impl Egress { /// Pushes log messages to logtail. This will push everything buffered into the /// log server. This should be called periodically. - pub async fn post(&mut self) -> Result<(), Error> { + pub async fn post(&self) -> Result<(), Error> { let values = self.pull(); self.push(values).await?; Ok(()) @@ -247,7 +247,7 @@ mod tests { #[tokio::test] async fn end_to_end() { - let (mut ing, mut eg) = Builder::default() + let (ing, mut eg) = Builder::default() .collection("rebterlai.logtail-poster.test".to_string()) .user_agent("rebterlai/test".to_string()) .base_url("http://127.0.0.1:3848".to_string()) diff --git a/examples/example-logtail-poster/src/main.rs b/examples/example-logtail-poster/src/main.rs index e11f951..df74b3d 100644 --- a/examples/example-logtail-poster/src/main.rs +++ b/examples/example-logtail-poster/src/main.rs @@ -9,11 +9,11 @@ struct Data { async fn main() { let collection = "rebterlai.example.logtail-poster".to_string(); let cfg = logtail_poster::Config::load(collection.clone()).unwrap(); - let (mut ing, mut eg) = Builder::default() + let (ing, eg) = Builder::default() .collection(collection) .private_id(cfg.private_id()) .user_agent("rebterlai/test".to_string()) - .base_url("http://127.0.0.1:48283".to_string()) + .base_url("http://127.0.0.1:3848".to_string()) .build() .unwrap();