logcatcher implementation attempt

Signed-off-by: Christine Dodrill <me@christine.website>
This commit is contained in:
Cadey Ratio 2021-09-09 19:12:56 -04:00
parent e7ccdd485e
commit 9065599679
9 changed files with 763 additions and 64 deletions

545
Cargo.lock generated
View File

@ -8,6 +8,39 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aho-corasick"
version = "0.7.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f"
dependencies = [
"memchr",
]
[[package]]
name = "ansi_term"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b"
dependencies = [
"winapi",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2"
dependencies = [
"winapi",
]
[[package]]
name = "anyhow"
version = "1.0.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28ae2b3dec75a406790005a200b1bd89785afc02517a00ca99ecfe093ee9e6cf"
[[package]]
name = "async-compression"
version = "0.3.8"
@ -21,12 +54,62 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-trait"
version = "0.1.51"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [
"hermit-abi",
"libc",
"winapi",
]
[[package]]
name = "autocfg"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "axum"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e2423522684032529c51d209740c77d57cc56bfef7b9a18630f919a974d2616"
dependencies = [
"async-trait",
"bitflags",
"bytes",
"futures-util",
"headers",
"http",
"http-body",
"hyper",
"pin-project-lite",
"regex",
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tokio-util",
"tower",
"tower-http",
"tower-layer",
"tower-service",
]
[[package]]
name = "base64"
version = "0.13.0"
@ -54,6 +137,12 @@ version = "3.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631"
[[package]]
name = "byteorder"
version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "1.1.0"
@ -89,6 +178,21 @@ dependencies = [
"winapi",
]
[[package]]
name = "clap"
version = "2.33.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002"
dependencies = [
"ansi_term 0.11.0",
"atty",
"bitflags",
"strsim",
"textwrap",
"unicode-width",
"vec_map",
]
[[package]]
name = "cpufeatures"
version = "0.1.5"
@ -107,6 +211,54 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845"
dependencies = [
"cfg-if",
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-epoch",
"crossbeam-queue",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
dependencies = [
"cfg-if",
"crossbeam-utils",
"lazy_static",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-queue"
version = "0.3.2"
@ -127,17 +279,6 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "derivative"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "digest"
version = "0.9.0"
@ -350,6 +491,40 @@ version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e"
[[package]]
name = "headers"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0b7591fb62902706ae8e7aaff416b1b0fa2c0fd0878b46dc13baa3712d8a855"
dependencies = [
"base64",
"bitflags",
"bytes",
"headers-core",
"http",
"mime",
"sha-1",
"time",
]
[[package]]
name = "headers-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429"
dependencies = [
"http",
]
[[package]]
name = "heck"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
[[package]]
name = "hermit-abi"
version = "0.1.19"
@ -531,6 +706,26 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "logcatcher"
version = "0.1.0"
dependencies = [
"anyhow",
"axum",
"futures",
"headers",
"hyper",
"lazy_static",
"logtail",
"prometheus",
"structopt",
"tokio",
"tower",
"tracing",
"tracing-subscriber",
"zstd",
]
[[package]]
name = "logtail"
version = "0.1.0"
@ -557,9 +752,9 @@ name = "logtail-poster"
version = "0.1.0"
dependencies = [
"chrono",
"crossbeam",
"logtail",
"reqwest",
"ring-channel",
"serde",
"serde_json",
"serial_test",
@ -570,6 +765,15 @@ dependencies = [
"zstd",
]
[[package]]
name = "matchers"
version = "0.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1"
dependencies = [
"regex-automata",
]
[[package]]
name = "matches"
version = "0.1.9"
@ -582,6 +786,15 @@ version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a"
[[package]]
name = "memoffset"
version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9"
dependencies = [
"autocfg",
]
[[package]]
name = "mime"
version = "0.3.16"
@ -701,6 +914,26 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.7"
@ -719,6 +952,30 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
@ -740,6 +997,43 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "procfs"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab8809e0c18450a2db0f236d2a44ec0b4c1412d0eb936233579f0990faa5d5cd"
dependencies = [
"bitflags",
"byteorder",
"flate2",
"hex",
"lazy_static",
"libc",
]
[[package]]
name = "prometheus"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5986aa8d62380092d2f50f8b1cdba9cb9b6731ffd4b25b51fd126b6c3e05b99c"
dependencies = [
"cfg-if",
"fnv",
"lazy_static",
"libc",
"memchr",
"parking_lot",
"procfs",
"protobuf",
"thiserror",
]
[[package]]
name = "protobuf"
version = "2.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "23129d50f2c9355ced935fce8a08bd706ee2e7ce2b3b33bf61dace0e379ac63a"
[[package]]
name = "quote"
version = "1.0.9"
@ -835,6 +1129,32 @@ dependencies = [
"bitflags",
]
[[package]]
name = "regex"
version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
[[package]]
name = "remove_dir_all"
version = "0.5.3"
@ -897,20 +1217,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "ring-channel"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c9cf571aa3c009bb9fec87e775642e26be2eddd65e806269f4cff39086a56e3"
dependencies = [
"crossbeam-queue",
"crossbeam-utils",
"derivative",
"futures",
"smallvec",
"spinning_top",
]
[[package]]
name = "rustls"
version = "0.19.1"
@ -1011,6 +1317,19 @@ dependencies = [
"syn",
]
[[package]]
name = "sha-1"
version = "0.9.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a0c8611594e2ab4ebbf06ec7cbbf0a99450b8570e96cbf5188b5d5f6ef18d81"
dependencies = [
"block-buffer",
"cfg-if",
"cpufeatures",
"digest",
"opaque-debug",
]
[[package]]
name = "sha2"
version = "0.9.5"
@ -1024,6 +1343,15 @@ dependencies = [
"opaque-debug",
]
[[package]]
name = "sharded-slab"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "740223c51853f3145fe7c90360d2d4232f2b62e3449489c207eccde818979982"
dependencies = [
"lazy_static",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -1062,12 +1390,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spinning_top"
version = "0.2.4"
name = "strsim"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75adad84ee84b521fb2cca2d4fd0f1dab1d8d026bda3c5bea4ca63b5f9f9293c"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "structopt"
version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf9d950ef167e25e0bdb073cf1d68e9ad2795ac826f2f3f59647817cf23c0bfa"
dependencies = [
"lock_api",
"clap",
"lazy_static",
"structopt-derive",
]
[[package]]
name = "structopt-derive"
version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "134d838a2c9943ac3125cf6df165eda53493451b719f3255b2a26b85f772d0ba"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
@ -1081,6 +1430,12 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "sync_wrapper"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
[[package]]
name = "tailscale-api"
version = "0.1.0"
@ -1104,6 +1459,15 @@ dependencies = [
"remove_dir_all",
]
[[package]]
name = "textwrap"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"
dependencies = [
"unicode-width",
]
[[package]]
name = "thiserror"
version = "1.0.29"
@ -1124,6 +1488,15 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd"
dependencies = [
"once_cell",
]
[[package]]
name = "time"
version = "0.1.44"
@ -1206,6 +1579,44 @@ dependencies = [
"tokio",
]
[[package]]
name = "tower"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f60422bc7fefa2f3ec70359b8ff1caff59d785877eb70595904605bcc412470f"
dependencies = [
"futures-core",
"futures-util",
"pin-project",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-http"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b7b56efe69aa0ad2b5da6b942e57ea9f6fe683b7a314d4ff48662e2c8838de1"
dependencies = [
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"pin-project",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62"
[[package]]
name = "tower-service"
version = "0.3.1"
@ -1219,10 +1630,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
dependencies = [
"cfg-if",
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.19"
@ -1232,6 +1656,49 @@ dependencies = [
"lazy_static",
]
[[package]]
name = "tracing-log"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6923477a48e41c1951f1999ef8bb5a3023eb723ceadafe78ffb65dc366761e3"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.2.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9cbe87a2fa7e35900ce5de20220a582a9483a7063811defce79d7cbd59d4cfe"
dependencies = [
"ansi_term 0.12.1",
"chrono",
"lazy_static",
"matchers",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
name = "try-lock"
version = "0.2.3"
@ -1259,6 +1726,18 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-width"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
[[package]]
name = "unicode-xid"
version = "0.2.2"
@ -1283,6 +1762,12 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "vec_map"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "version_check"
version = "0.9.3"

View File

@ -1,2 +1,2 @@
[workspace]
members = [ "crates/*", "examples/*" ]
members = [ "cmd/*", "crates/*", "examples/*" ]

36
cmd/logcatcher/Cargo.toml Normal file
View File

@ -0,0 +1,36 @@
[package]
name = "logcatcher"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1"
futures = "0.3"
headers = "0.3"
lazy_static = "1"
structopt = "0.3"
tower = "0.4"
tracing = "0.1"
tracing-subscriber = "0.2"
zstd = "0.9"
# local deps
logtail = { path = "../../crates/logtail" }
[dependencies.axum]
version = "0.2"
features = [ "headers" ]
[dependencies.hyper]
version = "0.14"
features = [ "full" ]
[dependencies.prometheus]
version = "0.12"
features = [ "process" ]
[dependencies.tokio]
version = "1"
features = [ "full" ]

View File

@ -0,0 +1,53 @@
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate prometheus;
#[macro_use]
extern crate tracing;
use anyhow::Result;
use std::{
net::{IpAddr, SocketAddr},
path::PathBuf,
};
use structopt::StructOpt;
mod server;
use server::*;
#[derive(Debug, StructOpt)]
#[structopt(name = "logcatcher", about = "The log catcher service")]
pub(crate) struct Config {
/// The maximum number of bytes that can be in-ram at once.
#[structopt(short, long, default_value = "1048576000")]
pub(crate) max_bytes: usize,
/// The TCP host to listen on for HTTP traffic.
#[structopt(short, long, default_value = "127.0.0.1:3848")]
pub(crate) http_host: SocketAddr,
/// The TCP host to serve debug traffic from.
#[structopt(short, long, default_value = "127.0.0.1:45012")]
pub(crate) debug_host: SocketAddr,
/// The IP address/es to expect proxied requests from.
#[structopt(short, long)]
pub(crate) proxy_ips: Option<Vec<IpAddr>>,
/// The place to store logs to disk.
#[structopt(short, long, default_value = "./var/log")]
pub(crate) log_dir: PathBuf,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let cfg = Config::from_args();
println!("{:?}", cfg);
let server = Server::new(cfg)?;
server.run().await?;
Ok(())
}

View File

@ -0,0 +1,131 @@
use anyhow::Result;
use axum::{
extract::{Path, TypedHeader},
handler::{get, post},
Router,
};
use headers::{Header, HeaderName, HeaderValue};
use hyper::http::StatusCode;
use prometheus::{Encoder, IntGauge};
use std::{convert::TryInto, io::Write, net::SocketAddr, path::PathBuf};
use tokio::sync::Semaphore;
lazy_static! {
static ref BYTES_IN_USE: IntGauge = register_int_gauge!(
"buffer_bytes_in_use",
"Number of bytes currently in use by zstd buffers"
)
.unwrap();
static ref MAX_BUFFER_SIZE: IntGauge = register_int_gauge!(
"max_buffer_bytes",
"Maximum number of bytes that can be in use by zstd buffers"
)
.unwrap();
static ref BUF_SEM: Semaphore = Semaphore::new(1048576000);
static ref ORIG_CONTENT_LENGTH: HeaderName =
axum::http::header::HeaderName::from_static("orig-content-length");
}
pub struct Server {
// config values
log_dir: PathBuf,
http_host: SocketAddr,
debug_host: SocketAddr,
}
impl Server {
pub(crate) fn new(cfg: super::Config) -> Result<Self> {
MAX_BUFFER_SIZE.set(cfg.max_bytes.try_into()?);
Ok(Self {
log_dir: cfg.log_dir,
http_host: cfg.http_host,
debug_host: cfg.debug_host,
})
}
pub(crate) async fn run(&self) -> Result<()> {
tokio::spawn(metrics(self.debug_host.clone()));
let router = Router::new().route("/c/:collection/:private_id", post(put_logs));
hyper::Server::bind(&self.http_host)
.serve(router.into_make_service())
.await?;
Ok(())
}
}
#[derive(Debug, Clone)]
struct OrigContentLength(u32);
impl Header for OrigContentLength {
fn name() -> &'static HeaderName {
&ORIG_CONTENT_LENGTH
}
fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
where
I: Iterator<Item = &'i HeaderValue>,
{
let value = values.next().ok_or_else(headers::Error::invalid)?;
match value.to_str().unwrap().parse::<u32>() {
Ok(val) => Ok(OrigContentLength(val)),
Err(_) => Err(headers::Error::invalid()),
}
}
fn encode<E>(&self, values: &mut E)
where
E: Extend<HeaderValue>,
{
values.extend(std::iter::once(self.0.into()));
}
}
#[instrument(skip(body))]
async fn put_logs(
TypedHeader(size): TypedHeader<headers::ContentLength>,
TypedHeader(orig_size): TypedHeader<OrigContentLength>,
Path((collection, private_id)): Path<(String, logtail::PrivateID)>,
body: axum::body::Body,
) -> StatusCode {
if let Err(why) = BUF_SEM.try_acquire_many(size.0 as u32) {
error!("error acquiring buffer: {}", why);
return StatusCode::INSUFFICIENT_STORAGE;
}
if let Err(why) = BUF_SEM.try_acquire_many(orig_size.0) {
error!("error acquiring buffer: {}", why);
return StatusCode::INSUFFICIENT_STORAGE;
}
BYTES_IN_USE.set(BUF_SEM.available_permits().try_into().unwrap());
let mut decompressed_body: Vec<u8> = Vec::new();
decompressed_body.resize(orig_size.0 as usize, 0);
let compressed_body = hyper::body::to_bytes(body).await.unwrap();
zstd::block::decompress_to_buffer(&compressed_body, &mut decompressed_body).unwrap();
std::io::stdout().lock().write(&decompressed_body).unwrap();
StatusCode::NO_CONTENT
}
async fn metrics(addr: SocketAddr) -> Result<()> {
let endpoint = Router::new().route("/metrics", get(metrics_endpoint));
hyper::Server::bind(&addr)
.serve(endpoint.into_make_service())
.await?;
Ok(())
}
async fn metrics_endpoint() -> Vec<u8> {
let encoder = prometheus::TextEncoder::new();
let metric_families = prometheus::gather();
let mut buffer = vec![];
encoder.encode(&metric_families, &mut buffer).unwrap();
buffer
}

View File

@ -1,12 +1,9 @@
use log::{Level, Metadata, Record};
use logtail_poster::Egress;
use std::{
env,
sync::{Arc, Mutex},
};
use std::{env, sync::Arc};
pub struct LogtailLogger {
ing: Arc<Mutex<logtail_poster::Ingress>>,
ing: Arc<logtail_poster::Ingress>,
threshold: Level,
}
@ -37,10 +34,11 @@ impl log::Log for LogtailLogger {
};
if let Ok(val) = serde_json::to_value(&ld) {
if let Ok(mut ing) = self.ing.lock() {
if let Err(why) = ing.send(val) {
eprintln!("logtail_facade::LogtailLogger::log: can't send json value to buffer: {}", why);
}
if let Err(why) = self.ing.send(val) {
eprintln!(
"logtail_facade::LogtailLogger::log: can't send json value to buffer: {}",
why
);
}
}
}
@ -55,12 +53,17 @@ pub fn init(collection: String) -> Result<Egress, Box<dyn std::error::Error>> {
.unwrap_or(logtail_poster::DEFAULT_HOST.to_string())
.to_string();
let (mut ing, eg) = logtail_poster::Builder::default()
let (ing, eg) = logtail_poster::Builder::default()
.collection(collection)
.base_url(target)
.buffer_size(256)
.private_id(cfg.private_id())
.build()?;
let ing = Arc::new(ing);
ing.send(serde_json::to_value(&ProgramStarted {
msg: "Program started".to_string(),
})?)?;
let threshold = if cfg!(debug_assertions) {
Level::Debug
@ -68,13 +71,8 @@ pub fn init(collection: String) -> Result<Egress, Box<dyn std::error::Error>> {
Level::Info
};
ing.send(serde_json::to_value(&ProgramStarted {
msg: "Program started".to_string(),
})?)?;
let ing = Arc::new(Mutex::new(ing.clone()));
let logger = LogtailLogger {
ing: ing.clone(),
ing,
threshold: threshold.clone(),
};

View File

@ -7,7 +7,7 @@ edition = "2018"
[dependencies]
chrono = { version = "0.4", features = [ "serde" ] }
ring-channel = "0.9"
crossbeam = "0.8"
reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls" ] }
serde = { version = "1", features = [ "derive" ] }
serde_json = "1"

View File

@ -5,7 +5,6 @@ This facilitates writing logs to a logtail server. This is a port of
`logtail.go`.
*/
use reqwest::Client;
use std::num::NonZeroUsize;
mod config;
pub use self::config::*;
@ -89,7 +88,7 @@ impl Builder {
return Err(Error::NoCollection);
}
let (tx, rx) = ring_channel::ring_channel(NonZeroUsize::new(buf_size).unwrap());
let (tx, rx) = crossbeam::channel::bounded(buf_size);
let private_id = self.private_id.unwrap_or(logtail::PrivateID::new());
let base_url = self.base_url.unwrap_or(DEFAULT_HOST.to_string());
let mut u = url::Url::parse(&base_url)?;
@ -127,7 +126,7 @@ pub enum Error {
TXFail(String),
#[error("can't get from in-memory buffer: {0}")]
RXFail(#[from] ring_channel::TryRecvError),
RXFail(#[from] crossbeam::channel::TryRecvError),
#[error("can't parse a URL: {0}")]
URLParseError(#[from] url::ParseError),
@ -152,12 +151,12 @@ pub enum Error {
/// need to.
#[derive(Clone)]
pub struct Ingress {
tx: ring_channel::RingSender<serde_json::Value>,
tx: crossbeam::channel::Sender<serde_json::Value>,
}
impl Ingress {
/// Sends a JSON object to the log server. This MUST be a JSON object.
pub fn send(&mut self, val: serde_json::Value) -> Result<(), Error> {
pub fn send(&self, val: serde_json::Value) -> Result<(), Error> {
if !val.is_object() {
return Err(Error::MustBeJsonObject);
}
@ -186,33 +185,29 @@ struct LogtailHeader {
pub struct Egress {
url: String,
client: reqwest::Client,
rx: ring_channel::RingReceiver<serde_json::Value>,
rx: crossbeam::channel::Receiver<serde_json::Value>,
}
impl Egress {
fn pull(&mut self) -> Result<Vec<serde_json::Value>, Error> {
fn pull(&mut self) -> Vec<serde_json::Value> {
let mut values: Vec<serde_json::Value> = vec![];
loop {
match self.rx.try_recv() {
Ok(val) => values.push(val),
Err(why) => {
use ring_channel::TryRecvError::*;
match why {
Empty => break,
Disconnected => return Err(Error::RXFail(why)),
};
break;
}
};
}
Ok(values)
values
}
/// Pushes log messages to logtail. This will push everything buffered into the
/// log server. This should be called periodically.
pub async fn post(&mut self) -> Result<(), Error> {
let values = self.pull()?;
let values = self.pull();
self.push(values).await?;
Ok(())
}
@ -255,7 +250,7 @@ mod tests {
let (mut ing, mut eg) = Builder::default()
.collection("rebterlai.logtail-poster.test".to_string())
.user_agent("rebterlai/test".to_string())
.base_url("http://127.0.0.1:48283".to_string())
.base_url("http://127.0.0.1:3848".to_string())
.build()
.unwrap();

View File

@ -3,6 +3,7 @@ use log::{debug, error, info, warn};
#[tokio::main]
async fn main() {
let mut eg = logtail_facade::init("rebterlai.example-logtail-facade".to_string()).unwrap();
eg.post().await.unwrap();
error!("error");
warn!("warn");
@ -10,5 +11,5 @@ async fn main() {
debug!("debug");
// TODO(Xe): automate this somehow
eg.post().await.unwrap()
eg.post().await.unwrap();
}