implement full end to end handler execution

This commit is contained in:
Cadey Ratio 2020-10-30 23:59:59 -04:00
parent 006a39ba8f
commit 2d4ca67e55
12 changed files with 223 additions and 23 deletions

26
Cargo.lock generated
View File

@ -654,10 +654,6 @@ dependencies = [
"version_check 0.9.2",
]
[[package]]
name = "executor"
version = "0.1.0"
[[package]]
name = "eyre"
version = "0.6.1"
@ -3172,6 +3168,28 @@ dependencies = [
"uuid",
]
[[package]]
name = "wasmcloud_executor"
version = "0.1.0"
dependencies = [
"color-eyre",
"diesel",
"jwt",
"lazy_static",
"rocket",
"rocket_contrib",
"serde",
"serde_json",
"thiserror",
"tracing",
"tracing-log",
"tracing-subscriber",
"ureq",
"url 2.1.1",
"uuid",
"wasmcloud",
]
[[package]]
name = "web-sys"
version = "0.3.45"

View File

@ -1,5 +1,5 @@
[package]
name = "executor"
name = "wasmcloud_executor"
version = "0.1.0"
authors = ["Christine Dodrill <me@christine.website>"]
edition = "2018"
@ -7,3 +7,27 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
color-eyre = "0.5"
rocket = "0.4"
jwt = "0.11"
lazy_static = "1.4"
serde_json = "^1"
serde = { version = "^1", features = ["derive"] }
thiserror = "1"
tracing = "0.1"
tracing-log = "0.1"
tracing-subscriber = "0.2"
ureq = { version = "1", features = ["json", "charset"] }
url = "2"
uuid = { version = "0.7", features = ["serde", "v4"] }
wasmcloud = { path = ".." }
[dependencies.diesel]
version = "1"
features = ["postgres", "r2d2", "uuidv07", "chrono"]
[dependencies.rocket_contrib]
version = "0.4"
default-features = false
features = ["diesel_postgres_pool", "uuid"]

1
executor/src/execute.rs Normal file
View File

@ -0,0 +1 @@

View File

@ -1,3 +1,143 @@
fn main() {
println!("Hello, world!");
#![feature(proc_macro_hygiene, decl_macro)]
#[macro_use]
extern crate diesel;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate rocket;
#[macro_use]
extern crate tracing;
use diesel::prelude::*;
use std::{
env, fs, io,
path::PathBuf,
process::{self, Output},
time,
};
use uuid::Uuid;
use wasmcloud::api::Error::InternalServerError;
use wasmcloud::{
api::{
Error::{Database, Impossible, Subcommand},
Result,
},
models, schema, MainDatabase,
};
// Name your user agent after your app?
pub static APP_USER_AGENT: &str = concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
" +https://tulpa.dev/wasmcloud/wasmcloud",
);
pub static TEMP_FOLDER: &str = concat!(
"/tmp/",
env!("CARGO_PKG_NAME"),
"-",
env!("CARGO_PKG_VERSION"),
"/"
);
#[instrument(skip(config), err)]
fn execute(
handler_id: Uuid,
config: Vec<models::HandlerConfig>,
handler_path: PathBuf,
) -> Result<(Output, time::Duration)> {
let mut child = process::Command::new("/usr/bin/env");
let child = child.arg("pahi");
let child = child.arg(handler_path);
let mut child = child.env("HANDLER_ID", handler_id.to_string());
for kv in config.into_iter() {
child = child.env(kv.key_name, kv.value_contents);
}
debug!("running");
let start = time::Instant::now();
let output = child.output().map_err(Subcommand)?;
let duration = start.elapsed();
Ok((output, duration))
}
#[instrument(skip(conn), err)]
#[get("/run/<handler_name>")]
fn schedule(handler_name: String, conn: MainDatabase) -> Result {
fs::create_dir_all(TEMP_FOLDER)?;
let hdl = {
use schema::handlers::dsl::{handlers, human_name};
handlers
.filter(human_name.eq(handler_name))
.first::<models::Handler>(&*conn)
.map_err(Database)
}?;
let cfg = {
use schema::handler_config::dsl::{handler_config, handler_id};
handler_config
.filter(handler_id.eq(hdl.id.clone()))
.load::<models::HandlerConfig>(&*conn)
.map_err(Database)
}?;
let u = url::Url::parse(&hdl.current_version.ok_or(Impossible)?).map_err(|_| Impossible)?;
debug!("{:?}", u.host_str().ok_or(Impossible)?);
// https://cdn.christine.website/file/christine-static/stickers/mara/hacker.png
let hdl_url = format!(
"https://cdn.christine.website/file/wasmcloud-modules/{}",
u.host_str().ok_or(Impossible)?
);
let fname = format!("{}{}", TEMP_FOLDER, u.host_str().unwrap());
debug!(url = &hdl_url[..], fname = &fname[..], "downloading module");
let resp = ureq::get(&hdl_url).set("User-Agent", APP_USER_AGENT).call();
if resp.ok() {
let mut fout = fs::File::create(&fname).map_err(|why| {
error!("can't make file: {}", why);
Subcommand(why)
})?;
io::copy(&mut resp.into_reader(), &mut fout).map_err(|why| Subcommand(why))?;
} else {
error!("while fetching url: {}", resp.status_line());
return Err(Impossible);
}
let (output, duration) = execute(hdl.id, cfg, fname.into()).map_err(|why| {
error!("error running module: {}", why);
InternalServerError(why.into())
})?;
info!(
duration = duration.as_millis() as i64,
module = u.path(),
"execution finished"
);
diesel::insert_into(schema::executions::table)
.values(&models::NewExecution {
handler_id: hdl.id,
finished: true,
stderr: Some(String::from_utf8(output.stderr).map_err(|_| Impossible)?), // XXX(Cadey): this is not impossible
execution_time: duration.as_millis() as i32,
})
.execute(&*conn)
.map_err(Database)?;
Ok(())
}
fn main() -> color_eyre::eyre::Result<()> {
color_eyre::install()?;
tracing_subscriber::fmt::init();
std::env::set_var("ROCKET_PORT", "8001"); // XXX(Cadey): so I can test both on my machine at once
rocket::ignite()
.attach(MainDatabase::fairing())
.mount("/", routes![schedule])
.launch();
Ok(())
}

View File

@ -1 +1 @@
-- This file should undo anything in `up.sql`
DROP TABLE handlers;

View File

@ -1 +1 @@
-- This file should undo anything in `up.sql`
DROP TABLE handler_config;

View File

@ -1 +1 @@
-- This file should undo anything in `up.sql`
DROP TABLE executions;

View File

@ -0,0 +1 @@
DROP INDEX handlers_human_name_idx;

View File

@ -0,0 +1 @@
CREATE UNIQUE INDEX handlers_human_name_idx ON handlers(human_name);

View File

@ -34,5 +34,17 @@
"type": "tarball",
"url": "https://github.com/mozilla/nixpkgs-mozilla/archive/57c8084c7ef41366993909c20491e359bbb90f54.tar.gz",
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
},
"pahi": {
"branch": "main",
"description": "The next-generation Olin runtime, made with love",
"homepage": null,
"owner": "Xe",
"repo": "pahi",
"rev": "c6baae2d67a2ebef7446eb99e932fbf1d076e34e",
"sha256": "1yvw9w429cjkqj46g1x8d5y1p02avargnz1025dnbkkclfqb9spx",
"type": "tarball",
"url": "https://github.com/Xe/pahi/archive/c6baae2d67a2ebef7446eb99e932fbf1d076e34e.tar.gz",
"url_template": "https://github.com/<owner>/<repo>/archive/<rev>.tar.gz"
}
}

View File

@ -2,6 +2,7 @@ let
sources = import ./nix/sources.nix;
rust = import ./nix/rust.nix { inherit sources; };
pkgs = import sources.nixpkgs { };
pahi = import sources.pahi { };
in pkgs.mkShell rec {
buildInputs = with pkgs; with elmPackages; [
# rust
@ -13,6 +14,9 @@ in pkgs.mkShell rec {
pkg-config
openssl
# wasm
pahi
# elm
elm2nix
elm

View File

@ -6,7 +6,7 @@ use rocket::{
response::Responder,
Outcome, Response,
};
use std::io::Cursor;
use std::io::{self, Cursor};
pub mod handler;
pub mod token;
@ -34,28 +34,22 @@ pub enum Error {
#[error("incorrect number of files uploaded (wanted {0})")]
IncorrectFilecount(usize),
#[error("subcommand execution failed: {0}")]
Subcommand(#[from] io::Error),
#[error("this should be impossible")]
Impossible,
}
impl<'a> Responder<'a> for Error {
fn respond_to(self, _: &Request) -> ::std::result::Result<Response<'a>, Status> {
match self {
Error::Database(why) => Response::build()
.header(ContentType::Plain)
.status(Status::InternalServerError)
.sized_body(Cursor::new(format!("{}", why)))
.ok(),
Error::BadOrNoAuth | Error::LackPermissions => Response::build()
.header(ContentType::Plain)
.status(Status::Unauthorized)
.sized_body(Cursor::new(format!("{}", self)))
.ok(),
Error::InternalServerError(why) | Error::ExternalDependencyFailed(why) => {
Response::build()
.header(ContentType::Plain)
.status(Status::InternalServerError)
.sized_body(Cursor::new(format!("{}", why)))
.ok()
}
Error::Backblaze(why) => Response::build()
.header(ContentType::Plain)
.status(Status::InternalServerError)
@ -66,6 +60,11 @@ impl<'a> Responder<'a> for Error {
.status(Status::BadRequest)
.sized_body(Cursor::new(format!("{}", self)))
.ok(),
_ => Response::build()
.header(ContentType::Plain)
.status(Status::InternalServerError)
.sized_body(Cursor::new(format!("{}", self)))
.ok(),
}
}
}