From 9065599679ed26836393e47238b23cdf2623abd7 Mon Sep 17 00:00:00 2001 From: Christine Dodrill Date: Thu, 9 Sep 2021 19:12:56 -0400 Subject: [PATCH] logcatcher implementation attempt Signed-off-by: Christine Dodrill --- Cargo.lock | 545 ++++++++++++++++++-- Cargo.toml | 2 +- cmd/logcatcher/Cargo.toml | 36 ++ cmd/logcatcher/src/main.rs | 53 ++ cmd/logcatcher/src/server/mod.rs | 131 +++++ crates/logtail-facade/src/lib.rs | 30 +- crates/logtail-poster/Cargo.toml | 2 +- crates/logtail-poster/src/lib.rs | 25 +- examples/example-logtail-facade/src/main.rs | 3 +- 9 files changed, 763 insertions(+), 64 deletions(-) create mode 100644 cmd/logcatcher/Cargo.toml create mode 100644 cmd/logcatcher/src/main.rs create mode 100644 cmd/logcatcher/src/server/mod.rs diff --git a/Cargo.lock b/Cargo.lock index c21af17..e41af11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,39 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aho-corasick" +version = "0.7.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e37cfd5e7657ada45f742d6e99ca5788580b5c529dc78faf11ece6dc702656f" +dependencies = [ + "memchr", +] + +[[package]] +name = "ansi_term" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" +dependencies = [ + "winapi", +] + +[[package]] +name = "ansi_term" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" +dependencies = [ + "winapi", +] + +[[package]] +name = "anyhow" +version = "1.0.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28ae2b3dec75a406790005a200b1bd89785afc02517a00ca99ecfe093ee9e6cf" + [[package]] name = "async-compression" version = "0.3.8" @@ -21,12 +54,62 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-trait" +version = "0.1.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi", +] + [[package]] name = "autocfg" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "axum" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e2423522684032529c51d209740c77d57cc56bfef7b9a18630f919a974d2616" +dependencies = [ + "async-trait", + "bitflags", + "bytes", + "futures-util", + "headers", + "http", + "http-body", + "hyper", + "pin-project-lite", + "regex", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-util", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + [[package]] name = "base64" version = "0.13.0" @@ -54,6 +137,12 @@ version = "3.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + [[package]] name = "bytes" version = "1.1.0" @@ -89,6 +178,21 @@ dependencies = [ "winapi", ] +[[package]] +name = "clap" +version = "2.33.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" +dependencies = [ + "ansi_term 0.11.0", + "atty", + "bitflags", + "strsim", + "textwrap", + "unicode-width", + "vec_map", +] + [[package]] name = "cpufeatures" version = "0.1.5" @@ -107,6 +211,54 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ae5588f6b3c3cb05239e90bd110f257254aecd01e4635400391aeae07497845" +dependencies = [ + "cfg-if", + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-epoch", + "crossbeam-queue", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-channel" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6455c0ca19f0d2fbf751b908d5c55c1f5cbc65e03c4225427254b46890bdde1e" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "lazy_static", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-queue" version = "0.3.2" @@ -127,17 +279,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "derivative" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "digest" version = "0.9.0" @@ -350,6 +491,40 @@ version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ab5ef0d4909ef3724cc8cce6ccc8572c5c817592e9285f5464f8e86f8bd3726e" +[[package]] +name = "headers" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0b7591fb62902706ae8e7aaff416b1b0fa2c0fd0878b46dc13baa3712d8a855" +dependencies = [ + "base64", + "bitflags", + "bytes", + "headers-core", + "http", + "mime", + "sha-1", + "time", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + +[[package]] +name = "heck" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c" +dependencies = [ + "unicode-segmentation", +] + [[package]] name = "hermit-abi" version = "0.1.19" @@ -531,6 +706,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "logcatcher" +version = "0.1.0" +dependencies = [ + "anyhow", + "axum", + "futures", + "headers", + "hyper", + "lazy_static", + "logtail", + "prometheus", + "structopt", + "tokio", + "tower", + "tracing", + "tracing-subscriber", + "zstd", +] + [[package]] name = "logtail" version = "0.1.0" @@ -557,9 +752,9 @@ name = "logtail-poster" version = "0.1.0" dependencies = [ "chrono", + "crossbeam", "logtail", "reqwest", - "ring-channel", "serde", "serde_json", "serial_test", @@ -570,6 +765,15 @@ dependencies = [ "zstd", ] +[[package]] +name = "matchers" +version = "0.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" +dependencies = [ + "regex-automata", +] + [[package]] name = "matches" version = "0.1.9" @@ -582,6 +786,15 @@ version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "308cc39be01b73d0d18f82a0e7b2a3df85245f84af96fdddc5d202d27e47b86a" +[[package]] +name = "memoffset" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59accc507f1338036a0477ef61afdae33cde60840f4dfe481319ce3ad116ddf9" +dependencies = [ + "autocfg", +] + [[package]] name = "mime" version = "0.3.16" @@ -701,6 +914,26 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "pin-project" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "pin-project-lite" version = "0.2.7" @@ -719,6 +952,30 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro-hack" version = "0.5.19" @@ -740,6 +997,43 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "procfs" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab8809e0c18450a2db0f236d2a44ec0b4c1412d0eb936233579f0990faa5d5cd" +dependencies = [ + "bitflags", + "byteorder", + "flate2", + "hex", + "lazy_static", + "libc", +] + +[[package]] +name = "prometheus" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5986aa8d62380092d2f50f8b1cdba9cb9b6731ffd4b25b51fd126b6c3e05b99c" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "libc", + "memchr", + "parking_lot", + "procfs", + "protobuf", + "thiserror", +] + +[[package]] +name = "protobuf" +version = "2.25.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23129d50f2c9355ced935fce8a08bd706ee2e7ce2b3b33bf61dace0e379ac63a" + [[package]] name = "quote" version = "1.0.9" @@ -835,6 +1129,32 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d07a8629359eb56f1e2fb1652bb04212c072a87ba68546a04065d525673ac461" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" +dependencies = [ + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b" + [[package]] name = "remove_dir_all" version = "0.5.3" @@ -897,20 +1217,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "ring-channel" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c9cf571aa3c009bb9fec87e775642e26be2eddd65e806269f4cff39086a56e3" -dependencies = [ - "crossbeam-queue", - "crossbeam-utils", - "derivative", - "futures", - "smallvec", - "spinning_top", -] - [[package]] name = "rustls" version = "0.19.1" @@ -1011,6 +1317,19 @@ dependencies = [ "syn", ] +[[package]] +name = "sha-1" +version = "0.9.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a0c8611594e2ab4ebbf06ec7cbbf0a99450b8570e96cbf5188b5d5f6ef18d81" +dependencies = [ + "block-buffer", + "cfg-if", + "cpufeatures", + "digest", + "opaque-debug", +] + [[package]] name = "sha2" version = "0.9.5" @@ -1024,6 +1343,15 @@ dependencies = [ "opaque-debug", ] +[[package]] +name = "sharded-slab" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "740223c51853f3145fe7c90360d2d4232f2b62e3449489c207eccde818979982" +dependencies = [ + "lazy_static", +] + [[package]] name = "signal-hook-registry" version = "1.4.0" @@ -1062,12 +1390,33 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" [[package]] -name = "spinning_top" -version = "0.2.4" +name = "strsim" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75adad84ee84b521fb2cca2d4fd0f1dab1d8d026bda3c5bea4ca63b5f9f9293c" +checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" + +[[package]] +name = "structopt" +version = "0.3.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf9d950ef167e25e0bdb073cf1d68e9ad2795ac826f2f3f59647817cf23c0bfa" dependencies = [ - "lock_api", + "clap", + "lazy_static", + "structopt-derive", +] + +[[package]] +name = "structopt-derive" +version = "0.4.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "134d838a2c9943ac3125cf6df165eda53493451b719f3255b2a26b85f772d0ba" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", ] [[package]] @@ -1081,6 +1430,12 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sync_wrapper" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" + [[package]] name = "tailscale-api" version = "0.1.0" @@ -1104,6 +1459,15 @@ dependencies = [ "remove_dir_all", ] +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + [[package]] name = "thiserror" version = "1.0.29" @@ -1124,6 +1488,15 @@ dependencies = [ "syn", ] +[[package]] +name = "thread_local" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd" +dependencies = [ + "once_cell", +] + [[package]] name = "time" version = "0.1.44" @@ -1206,6 +1579,44 @@ dependencies = [ "tokio", ] +[[package]] +name = "tower" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60422bc7fefa2f3ec70359b8ff1caff59d785877eb70595904605bcc412470f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b56efe69aa0ad2b5da6b942e57ea9f6fe683b7a314d4ff48662e2c8838de1" +dependencies = [ + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "pin-project", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "343bc9466d3fe6b0f960ef45960509f84480bf4fd96f92901afe7ff3df9d3a62" + [[package]] name = "tower-service" version = "0.3.1" @@ -1219,10 +1630,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d" dependencies = [ "cfg-if", + "log", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c42e6fa53307c8a17e4ccd4dc81cf5ec38db9209f59b222210375b54ee40d1e2" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.19" @@ -1232,6 +1656,49 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "tracing-log" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6923477a48e41c1951f1999ef8bb5a3023eb723ceadafe78ffb65dc366761e3" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-serde" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b" +dependencies = [ + "serde", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9cbe87a2fa7e35900ce5de20220a582a9483a7063811defce79d7cbd59d4cfe" +dependencies = [ + "ansi_term 0.12.1", + "chrono", + "lazy_static", + "matchers", + "regex", + "serde", + "serde_json", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", + "tracing-serde", +] + [[package]] name = "try-lock" version = "0.2.3" @@ -1259,6 +1726,18 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-segmentation" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b" + +[[package]] +name = "unicode-width" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" + [[package]] name = "unicode-xid" version = "0.2.2" @@ -1283,6 +1762,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "vec_map" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" + [[package]] name = "version_check" version = "0.9.3" diff --git a/Cargo.toml b/Cargo.toml index af8ef17..1d2a93e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,2 +1,2 @@ [workspace] -members = [ "crates/*", "examples/*" ] +members = [ "cmd/*", "crates/*", "examples/*" ] diff --git a/cmd/logcatcher/Cargo.toml b/cmd/logcatcher/Cargo.toml new file mode 100644 index 0000000..168ea04 --- /dev/null +++ b/cmd/logcatcher/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "logcatcher" +version = "0.1.0" +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1" +futures = "0.3" +headers = "0.3" +lazy_static = "1" +structopt = "0.3" +tower = "0.4" +tracing = "0.1" +tracing-subscriber = "0.2" +zstd = "0.9" + +# local deps +logtail = { path = "../../crates/logtail" } + +[dependencies.axum] +version = "0.2" +features = [ "headers" ] + +[dependencies.hyper] +version = "0.14" +features = [ "full" ] + +[dependencies.prometheus] +version = "0.12" +features = [ "process" ] + +[dependencies.tokio] +version = "1" +features = [ "full" ] diff --git a/cmd/logcatcher/src/main.rs b/cmd/logcatcher/src/main.rs new file mode 100644 index 0000000..e127b7a --- /dev/null +++ b/cmd/logcatcher/src/main.rs @@ -0,0 +1,53 @@ +#[macro_use] +extern crate lazy_static; +#[macro_use] +extern crate prometheus; +#[macro_use] +extern crate tracing; + +use anyhow::Result; +use std::{ + net::{IpAddr, SocketAddr}, + path::PathBuf, +}; +use structopt::StructOpt; + +mod server; +use server::*; + +#[derive(Debug, StructOpt)] +#[structopt(name = "logcatcher", about = "The log catcher service")] +pub(crate) struct Config { + /// The maximum number of bytes that can be in-ram at once. + #[structopt(short, long, default_value = "1048576000")] + pub(crate) max_bytes: usize, + + /// The TCP host to listen on for HTTP traffic. + #[structopt(short, long, default_value = "127.0.0.1:3848")] + pub(crate) http_host: SocketAddr, + + /// The TCP host to serve debug traffic from. + #[structopt(short, long, default_value = "127.0.0.1:45012")] + pub(crate) debug_host: SocketAddr, + + /// The IP address/es to expect proxied requests from. + #[structopt(short, long)] + pub(crate) proxy_ips: Option>, + + /// The place to store logs to disk. + #[structopt(short, long, default_value = "./var/log")] + pub(crate) log_dir: PathBuf, +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt::init(); + + let cfg = Config::from_args(); + println!("{:?}", cfg); + + let server = Server::new(cfg)?; + server.run().await?; + + Ok(()) +} diff --git a/cmd/logcatcher/src/server/mod.rs b/cmd/logcatcher/src/server/mod.rs new file mode 100644 index 0000000..4a951ab --- /dev/null +++ b/cmd/logcatcher/src/server/mod.rs @@ -0,0 +1,131 @@ +use anyhow::Result; +use axum::{ + extract::{Path, TypedHeader}, + handler::{get, post}, + Router, +}; +use headers::{Header, HeaderName, HeaderValue}; +use hyper::http::StatusCode; +use prometheus::{Encoder, IntGauge}; +use std::{convert::TryInto, io::Write, net::SocketAddr, path::PathBuf}; +use tokio::sync::Semaphore; + +lazy_static! { + static ref BYTES_IN_USE: IntGauge = register_int_gauge!( + "buffer_bytes_in_use", + "Number of bytes currently in use by zstd buffers" + ) + .unwrap(); + static ref MAX_BUFFER_SIZE: IntGauge = register_int_gauge!( + "max_buffer_bytes", + "Maximum number of bytes that can be in use by zstd buffers" + ) + .unwrap(); + static ref BUF_SEM: Semaphore = Semaphore::new(1048576000); + static ref ORIG_CONTENT_LENGTH: HeaderName = + axum::http::header::HeaderName::from_static("orig-content-length"); +} + +pub struct Server { + // config values + log_dir: PathBuf, + http_host: SocketAddr, + debug_host: SocketAddr, +} + +impl Server { + pub(crate) fn new(cfg: super::Config) -> Result { + MAX_BUFFER_SIZE.set(cfg.max_bytes.try_into()?); + + Ok(Self { + log_dir: cfg.log_dir, + http_host: cfg.http_host, + debug_host: cfg.debug_host, + }) + } + + pub(crate) async fn run(&self) -> Result<()> { + tokio::spawn(metrics(self.debug_host.clone())); + + let router = Router::new().route("/c/:collection/:private_id", post(put_logs)); + + hyper::Server::bind(&self.http_host) + .serve(router.into_make_service()) + .await?; + + Ok(()) + } +} + +#[derive(Debug, Clone)] +struct OrigContentLength(u32); + +impl Header for OrigContentLength { + fn name() -> &'static HeaderName { + &ORIG_CONTENT_LENGTH + } + + fn decode<'i, I>(values: &mut I) -> Result + where + I: Iterator, + { + let value = values.next().ok_or_else(headers::Error::invalid)?; + + match value.to_str().unwrap().parse::() { + Ok(val) => Ok(OrigContentLength(val)), + Err(_) => Err(headers::Error::invalid()), + } + } + + fn encode(&self, values: &mut E) + where + E: Extend, + { + values.extend(std::iter::once(self.0.into())); + } +} + +#[instrument(skip(body))] +async fn put_logs( + TypedHeader(size): TypedHeader, + TypedHeader(orig_size): TypedHeader, + Path((collection, private_id)): Path<(String, logtail::PrivateID)>, + body: axum::body::Body, +) -> StatusCode { + if let Err(why) = BUF_SEM.try_acquire_many(size.0 as u32) { + error!("error acquiring buffer: {}", why); + return StatusCode::INSUFFICIENT_STORAGE; + } + if let Err(why) = BUF_SEM.try_acquire_many(orig_size.0) { + error!("error acquiring buffer: {}", why); + return StatusCode::INSUFFICIENT_STORAGE; + } + BYTES_IN_USE.set(BUF_SEM.available_permits().try_into().unwrap()); + + let mut decompressed_body: Vec = Vec::new(); + decompressed_body.resize(orig_size.0 as usize, 0); + + let compressed_body = hyper::body::to_bytes(body).await.unwrap(); + zstd::block::decompress_to_buffer(&compressed_body, &mut decompressed_body).unwrap(); + + std::io::stdout().lock().write(&decompressed_body).unwrap(); + + StatusCode::NO_CONTENT +} + +async fn metrics(addr: SocketAddr) -> Result<()> { + let endpoint = Router::new().route("/metrics", get(metrics_endpoint)); + hyper::Server::bind(&addr) + .serve(endpoint.into_make_service()) + .await?; + + Ok(()) +} + +async fn metrics_endpoint() -> Vec { + let encoder = prometheus::TextEncoder::new(); + let metric_families = prometheus::gather(); + let mut buffer = vec![]; + encoder.encode(&metric_families, &mut buffer).unwrap(); + buffer +} diff --git a/crates/logtail-facade/src/lib.rs b/crates/logtail-facade/src/lib.rs index 65fd079..b4b0d8c 100644 --- a/crates/logtail-facade/src/lib.rs +++ b/crates/logtail-facade/src/lib.rs @@ -1,12 +1,9 @@ use log::{Level, Metadata, Record}; use logtail_poster::Egress; -use std::{ - env, - sync::{Arc, Mutex}, -}; +use std::{env, sync::Arc}; pub struct LogtailLogger { - ing: Arc>, + ing: Arc, threshold: Level, } @@ -37,10 +34,11 @@ impl log::Log for LogtailLogger { }; if let Ok(val) = serde_json::to_value(&ld) { - if let Ok(mut ing) = self.ing.lock() { - if let Err(why) = ing.send(val) { - eprintln!("logtail_facade::LogtailLogger::log: can't send json value to buffer: {}", why); - } + if let Err(why) = self.ing.send(val) { + eprintln!( + "logtail_facade::LogtailLogger::log: can't send json value to buffer: {}", + why + ); } } } @@ -55,12 +53,17 @@ pub fn init(collection: String) -> Result> { .unwrap_or(logtail_poster::DEFAULT_HOST.to_string()) .to_string(); - let (mut ing, eg) = logtail_poster::Builder::default() + let (ing, eg) = logtail_poster::Builder::default() .collection(collection) .base_url(target) .buffer_size(256) .private_id(cfg.private_id()) .build()?; + let ing = Arc::new(ing); + + ing.send(serde_json::to_value(&ProgramStarted { + msg: "Program started".to_string(), + })?)?; let threshold = if cfg!(debug_assertions) { Level::Debug @@ -68,13 +71,8 @@ pub fn init(collection: String) -> Result> { Level::Info }; - ing.send(serde_json::to_value(&ProgramStarted { - msg: "Program started".to_string(), - })?)?; - let ing = Arc::new(Mutex::new(ing.clone())); - let logger = LogtailLogger { - ing: ing.clone(), + ing, threshold: threshold.clone(), }; diff --git a/crates/logtail-poster/Cargo.toml b/crates/logtail-poster/Cargo.toml index 6d529dc..ef82a48 100644 --- a/crates/logtail-poster/Cargo.toml +++ b/crates/logtail-poster/Cargo.toml @@ -7,7 +7,7 @@ edition = "2018" [dependencies] chrono = { version = "0.4", features = [ "serde" ] } -ring-channel = "0.9" +crossbeam = "0.8" reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls" ] } serde = { version = "1", features = [ "derive" ] } serde_json = "1" diff --git a/crates/logtail-poster/src/lib.rs b/crates/logtail-poster/src/lib.rs index 4fdf395..7ccb946 100644 --- a/crates/logtail-poster/src/lib.rs +++ b/crates/logtail-poster/src/lib.rs @@ -5,7 +5,6 @@ This facilitates writing logs to a logtail server. This is a port of `logtail.go`. */ use reqwest::Client; -use std::num::NonZeroUsize; mod config; pub use self::config::*; @@ -89,7 +88,7 @@ impl Builder { return Err(Error::NoCollection); } - let (tx, rx) = ring_channel::ring_channel(NonZeroUsize::new(buf_size).unwrap()); + let (tx, rx) = crossbeam::channel::bounded(buf_size); let private_id = self.private_id.unwrap_or(logtail::PrivateID::new()); let base_url = self.base_url.unwrap_or(DEFAULT_HOST.to_string()); let mut u = url::Url::parse(&base_url)?; @@ -127,7 +126,7 @@ pub enum Error { TXFail(String), #[error("can't get from in-memory buffer: {0}")] - RXFail(#[from] ring_channel::TryRecvError), + RXFail(#[from] crossbeam::channel::TryRecvError), #[error("can't parse a URL: {0}")] URLParseError(#[from] url::ParseError), @@ -152,12 +151,12 @@ pub enum Error { /// need to. #[derive(Clone)] pub struct Ingress { - tx: ring_channel::RingSender, + tx: crossbeam::channel::Sender, } impl Ingress { /// Sends a JSON object to the log server. This MUST be a JSON object. - pub fn send(&mut self, val: serde_json::Value) -> Result<(), Error> { + pub fn send(&self, val: serde_json::Value) -> Result<(), Error> { if !val.is_object() { return Err(Error::MustBeJsonObject); } @@ -186,33 +185,29 @@ struct LogtailHeader { pub struct Egress { url: String, client: reqwest::Client, - rx: ring_channel::RingReceiver, + rx: crossbeam::channel::Receiver, } impl Egress { - fn pull(&mut self) -> Result, Error> { + fn pull(&mut self) -> Vec { let mut values: Vec = vec![]; loop { match self.rx.try_recv() { Ok(val) => values.push(val), Err(why) => { - use ring_channel::TryRecvError::*; - match why { - Empty => break, - Disconnected => return Err(Error::RXFail(why)), - }; + break; } }; } - Ok(values) + values } /// Pushes log messages to logtail. This will push everything buffered into the /// log server. This should be called periodically. pub async fn post(&mut self) -> Result<(), Error> { - let values = self.pull()?; + let values = self.pull(); self.push(values).await?; Ok(()) } @@ -255,7 +250,7 @@ mod tests { let (mut ing, mut eg) = Builder::default() .collection("rebterlai.logtail-poster.test".to_string()) .user_agent("rebterlai/test".to_string()) - .base_url("http://127.0.0.1:48283".to_string()) + .base_url("http://127.0.0.1:3848".to_string()) .build() .unwrap(); diff --git a/examples/example-logtail-facade/src/main.rs b/examples/example-logtail-facade/src/main.rs index 0e2574e..3c81675 100644 --- a/examples/example-logtail-facade/src/main.rs +++ b/examples/example-logtail-facade/src/main.rs @@ -3,6 +3,7 @@ use log::{debug, error, info, warn}; #[tokio::main] async fn main() { let mut eg = logtail_facade::init("rebterlai.example-logtail-facade".to_string()).unwrap(); + eg.post().await.unwrap(); error!("error"); warn!("warn"); @@ -10,5 +11,5 @@ async fn main() { debug!("debug"); // TODO(Xe): automate this somehow - eg.post().await.unwrap() + eg.post().await.unwrap(); }