From 8b2457891017d0796abe861272f1d9e386d47c86 Mon Sep 17 00:00:00 2001 From: Christine Dodrill Date: Thu, 9 Sep 2021 19:21:34 -0400 Subject: [PATCH] cmd/logcatcher: match mincatcher output Signed-off-by: Christine Dodrill --- Cargo.lock | 3 +++ cmd/logcatcher/Cargo.toml | 9 +++++++++ cmd/logcatcher/src/server/mod.rs | 27 +++++++++++++++++++++++++-- 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e41af11..e5bb373 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -712,12 +712,15 @@ version = "0.1.0" dependencies = [ "anyhow", "axum", + "chrono", "futures", "headers", "hyper", "lazy_static", "logtail", "prometheus", + "serde", + "serde_json", "structopt", "tokio", "tower", diff --git a/cmd/logcatcher/Cargo.toml b/cmd/logcatcher/Cargo.toml index 168ea04..68cd6a1 100644 --- a/cmd/logcatcher/Cargo.toml +++ b/cmd/logcatcher/Cargo.toml @@ -10,6 +10,7 @@ anyhow = "1" futures = "0.3" headers = "0.3" lazy_static = "1" +serde_json = "1" structopt = "0.3" tower = "0.4" tracing = "0.1" @@ -23,6 +24,10 @@ logtail = { path = "../../crates/logtail" } version = "0.2" features = [ "headers" ] +[dependencies.chrono] +version = "0.4" +features = [ "serde" ] + [dependencies.hyper] version = "0.14" features = [ "full" ] @@ -31,6 +36,10 @@ features = [ "full" ] version = "0.12" features = [ "process" ] +[dependencies.serde] +version = "1" +features = [ "derive" ] + [dependencies.tokio] version = "1" features = [ "full" ] diff --git a/cmd/logcatcher/src/server/mod.rs b/cmd/logcatcher/src/server/mod.rs index 4a951ab..6eaef8d 100644 --- a/cmd/logcatcher/src/server/mod.rs +++ b/cmd/logcatcher/src/server/mod.rs @@ -4,10 +4,12 @@ use axum::{ handler::{get, post}, Router, }; +use chrono::prelude::*; use headers::{Header, HeaderName, HeaderValue}; use hyper::http::StatusCode; +use logtail::PublicID; use prometheus::{Encoder, IntGauge}; -use std::{convert::TryInto, io::Write, net::SocketAddr, path::PathBuf}; +use std::{convert::TryInto, net::SocketAddr, path::PathBuf}; use tokio::sync::Semaphore; lazy_static! { @@ -108,7 +110,28 @@ async fn put_logs( let compressed_body = hyper::body::to_bytes(body).await.unwrap(); zstd::block::decompress_to_buffer(&compressed_body, &mut decompressed_body).unwrap(); - std::io::stdout().lock().write(&decompressed_body).unwrap(); + let vals: Vec = serde_json::from_slice(&decompressed_body).unwrap(); + + #[derive(serde::Serialize)] + pub struct Envelope { + log_id: PublicID, + collection: String, + data: serde_json::Value, + server_time: DateTime, + } + + for data in vals { + serde_json::to_writer( + std::io::stdout().lock(), + &Envelope { + log_id: private_id.as_public(), + collection: collection.clone(), + data, + server_time: Utc::now(), + }, + ) + .unwrap(); + } StatusCode::NO_CONTENT }