use anyhow::Result; use axum::{ extract::{Path, TypedHeader}, handler::{get, post}, Router, }; use chrono::prelude::*; use headers::{Header, HeaderName, HeaderValue}; use hyper::http::StatusCode; use logtail::PublicID; use prometheus::{Encoder, IntCounterVec, IntGauge}; use std::{convert::TryInto, net::SocketAddr}; use tokio::sync::Semaphore; use tower_http::trace::TraceLayer; const BYTE_MAX: usize = 1048576000; lazy_static! { static ref LINES_PER_COLLECTION: IntCounterVec = register_int_counter_vec!( opts!("lines_per_collection", "log lines per log collection"), &["collection"] ) .unwrap(); static ref BYTES_IN_USE: IntGauge = register_int_gauge!( "buffer_bytes_in_use", "Number of bytes currently in use by zstd buffers" ) .unwrap(); static ref MAX_BUFFER_SIZE: IntGauge = register_int_gauge!( "max_buffer_bytes", "Maximum number of bytes that can be in use by zstd buffers" ) .unwrap(); static ref BUF_SEM: Semaphore = Semaphore::new(BYTE_MAX); static ref ORIG_CONTENT_LENGTH: HeaderName = axum::http::header::HeaderName::from_static("orig-content-length"); } pub struct Server { // config values http_host: SocketAddr, debug_host: SocketAddr, } impl Server { pub(crate) fn new(cfg: super::Config) -> Result { MAX_BUFFER_SIZE.set(BYTE_MAX as i64); Ok(Self { http_host: cfg.http_host, debug_host: cfg.debug_host, }) } pub(crate) async fn run(&self) -> Result<()> { tokio::spawn(metrics(self.debug_host.clone())); let router = Router::new() .route("/c/:collection/:private_id", post(put_logs)) .layer(TraceLayer::new_for_http()); hyper::Server::bind(&self.http_host) .serve(router.into_make_service()) .await?; Ok(()) } } #[derive(Debug, Clone)] struct OrigContentLength(u32); impl Header for OrigContentLength { fn name() -> &'static HeaderName { &ORIG_CONTENT_LENGTH } fn decode<'i, I>(values: &mut I) -> Result where I: Iterator, { let value = values.next().ok_or_else(headers::Error::invalid)?; match value.to_str().unwrap().parse::() { Ok(val) => Ok(OrigContentLength(val)), Err(_) => Err(headers::Error::invalid()), } } fn encode(&self, values: &mut E) where E: Extend, { values.extend(std::iter::once(self.0.into())); } } #[instrument(skip(body))] async fn put_logs( TypedHeader(size): TypedHeader, TypedHeader(orig_size): TypedHeader, Path((collection, private_id)): Path<(String, logtail::PrivateID)>, body: axum::body::Body, ) -> StatusCode { if let Err(why) = BUF_SEM.try_acquire_many(size.0 as u32) { error!("error acquiring buffer: {}", why); return StatusCode::INSUFFICIENT_STORAGE; } if let Err(why) = BUF_SEM.try_acquire_many(orig_size.0) { error!("error acquiring buffer: {}", why); return StatusCode::INSUFFICIENT_STORAGE; } BYTES_IN_USE.set(BUF_SEM.available_permits() as i64 - BYTE_MAX as i64); let mut decompressed_body: Vec = Vec::new(); decompressed_body.resize(orig_size.0 as usize, 0); let compressed_body = hyper::body::to_bytes(body).await.unwrap(); zstd::block::decompress_to_buffer(&compressed_body, &mut decompressed_body).unwrap(); let vals: Vec = serde_json::from_slice(&decompressed_body).unwrap(); #[derive(serde::Serialize)] pub struct Envelope { log_id: PublicID, collection: String, data: serde_json::Value, server_time: DateTime, } for data in vals { serde_json::to_writer( std::io::stdout().lock(), &Envelope { log_id: private_id.as_public(), collection: collection.clone(), data, server_time: Utc::now(), }, ) .unwrap(); println!(""); LINES_PER_COLLECTION .with_label_values(&[collection.clone().as_str()]) .inc(); } StatusCode::NO_CONTENT } async fn metrics(addr: SocketAddr) -> Result<()> { let endpoint = Router::new().route("/metrics", get(metrics_endpoint)); hyper::Server::bind(&addr) .serve(endpoint.into_make_service()) .await?; Ok(()) } async fn metrics_endpoint() -> Vec { let encoder = prometheus::TextEncoder::new(); let metric_families = prometheus::gather(); let mut buffer = vec![]; encoder.encode(&metric_families, &mut buffer).unwrap(); buffer }