167 lines
4.7 KiB
Rust
167 lines
4.7 KiB
Rust
use anyhow::Result;
|
|
use axum::{
|
|
extract::{Path, TypedHeader},
|
|
handler::{get, post},
|
|
Router,
|
|
};
|
|
use chrono::prelude::*;
|
|
use headers::{Header, HeaderName, HeaderValue};
|
|
use hyper::http::StatusCode;
|
|
use logtail::PublicID;
|
|
use prometheus::{Encoder, IntCounterVec, IntGauge};
|
|
use std::{convert::TryInto, net::SocketAddr};
|
|
use tokio::sync::Semaphore;
|
|
use tower_http::trace::TraceLayer;
|
|
|
|
const BYTE_MAX: usize = 1048576000;
|
|
|
|
lazy_static! {
|
|
static ref LINES_PER_COLLECTION: IntCounterVec = register_int_counter_vec!(
|
|
opts!("lines_per_collection", "log lines per log collection"),
|
|
&["collection"]
|
|
)
|
|
.unwrap();
|
|
static ref BYTES_IN_USE: IntGauge = register_int_gauge!(
|
|
"buffer_bytes_in_use",
|
|
"Number of bytes currently in use by zstd buffers"
|
|
)
|
|
.unwrap();
|
|
static ref MAX_BUFFER_SIZE: IntGauge = register_int_gauge!(
|
|
"max_buffer_bytes",
|
|
"Maximum number of bytes that can be in use by zstd buffers"
|
|
)
|
|
.unwrap();
|
|
static ref BUF_SEM: Semaphore = Semaphore::new(BYTE_MAX);
|
|
static ref ORIG_CONTENT_LENGTH: HeaderName =
|
|
axum::http::header::HeaderName::from_static("orig-content-length");
|
|
}
|
|
|
|
pub struct Server {
|
|
// config values
|
|
http_host: SocketAddr,
|
|
debug_host: SocketAddr,
|
|
}
|
|
|
|
impl Server {
|
|
pub(crate) fn new(cfg: super::Config) -> Result<Self> {
|
|
MAX_BUFFER_SIZE.set(BYTE_MAX as i64);
|
|
|
|
Ok(Self {
|
|
http_host: cfg.http_host,
|
|
debug_host: cfg.debug_host,
|
|
})
|
|
}
|
|
|
|
pub(crate) async fn run(&self) -> Result<()> {
|
|
tokio::spawn(metrics(self.debug_host.clone()));
|
|
|
|
let router = Router::new()
|
|
.route("/c/:collection/:private_id", post(put_logs))
|
|
.layer(TraceLayer::new_for_http());
|
|
|
|
hyper::Server::bind(&self.http_host)
|
|
.serve(router.into_make_service())
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
struct OrigContentLength(u32);
|
|
|
|
impl Header for OrigContentLength {
|
|
fn name() -> &'static HeaderName {
|
|
&ORIG_CONTENT_LENGTH
|
|
}
|
|
|
|
fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
|
|
where
|
|
I: Iterator<Item = &'i HeaderValue>,
|
|
{
|
|
let value = values.next().ok_or_else(headers::Error::invalid)?;
|
|
|
|
match value.to_str().unwrap().parse::<u32>() {
|
|
Ok(val) => Ok(OrigContentLength(val)),
|
|
Err(_) => Err(headers::Error::invalid()),
|
|
}
|
|
}
|
|
|
|
fn encode<E>(&self, values: &mut E)
|
|
where
|
|
E: Extend<HeaderValue>,
|
|
{
|
|
values.extend(std::iter::once(self.0.into()));
|
|
}
|
|
}
|
|
|
|
#[instrument(skip(body))]
|
|
async fn put_logs(
|
|
TypedHeader(size): TypedHeader<headers::ContentLength>,
|
|
TypedHeader(orig_size): TypedHeader<OrigContentLength>,
|
|
Path((collection, private_id)): Path<(String, logtail::PrivateID)>,
|
|
body: axum::body::Body,
|
|
) -> StatusCode {
|
|
if let Err(why) = BUF_SEM.try_acquire_many(size.0 as u32) {
|
|
error!("error acquiring buffer: {}", why);
|
|
return StatusCode::INSUFFICIENT_STORAGE;
|
|
}
|
|
if let Err(why) = BUF_SEM.try_acquire_many(orig_size.0) {
|
|
error!("error acquiring buffer: {}", why);
|
|
return StatusCode::INSUFFICIENT_STORAGE;
|
|
}
|
|
BYTES_IN_USE.set(BUF_SEM.available_permits() as i64 - BYTE_MAX as i64);
|
|
|
|
let mut decompressed_body: Vec<u8> = Vec::new();
|
|
decompressed_body.resize(orig_size.0 as usize, 0);
|
|
|
|
let compressed_body = hyper::body::to_bytes(body).await.unwrap();
|
|
zstd::block::decompress_to_buffer(&compressed_body, &mut decompressed_body).unwrap();
|
|
|
|
let vals: Vec<serde_json::Value> = serde_json::from_slice(&decompressed_body).unwrap();
|
|
|
|
#[derive(serde::Serialize)]
|
|
pub struct Envelope {
|
|
log_id: PublicID,
|
|
collection: String,
|
|
data: serde_json::Value,
|
|
server_time: DateTime<Utc>,
|
|
}
|
|
|
|
for data in vals {
|
|
serde_json::to_writer(
|
|
std::io::stdout().lock(),
|
|
&Envelope {
|
|
log_id: private_id.as_public(),
|
|
collection: collection.clone(),
|
|
data,
|
|
server_time: Utc::now(),
|
|
},
|
|
)
|
|
.unwrap();
|
|
println!("");
|
|
LINES_PER_COLLECTION
|
|
.with_label_values(&[collection.clone().as_str()])
|
|
.inc();
|
|
}
|
|
|
|
StatusCode::NO_CONTENT
|
|
}
|
|
|
|
async fn metrics(addr: SocketAddr) -> Result<()> {
|
|
let endpoint = Router::new().route("/metrics", get(metrics_endpoint));
|
|
hyper::Server::bind(&addr)
|
|
.serve(endpoint.into_make_service())
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn metrics_endpoint() -> Vec<u8> {
|
|
let encoder = prometheus::TextEncoder::new();
|
|
let metric_families = prometheus::gather();
|
|
let mut buffer = vec![];
|
|
encoder.encode(&metric_families, &mut buffer).unwrap();
|
|
buffer
|
|
}
|