package main import ( "bytes" "context" "encoding/json" "flag" "fmt" "io/ioutil" "log" "os" "os/signal" "sync" "syscall" "time" "github.com/facebookgo/flagenv" shell "github.com/ipfs/go-ipfs-api" nats "github.com/nats-io/nats.go" "github.com/perlin-network/life/exec" "github.com/rogpeppe/go-internal/txtar" "tulpa.dev/within/wasmcloud/cmd/internal" "tulpa.dev/within/wasmcloud/executor" "within.website/olin/policy" ) var ( natsURL = flag.String("nats-url", nats.DefaultURL, "nats URL") ipfsURL = flag.String("ipfs-host", "localhost:5001", "IPFS host") workerCount = flag.Int("worker-count", 1, "number of wasm executor workers") gasLimit = flag.Int("gas-limit", 1048576, "number of wasm instructions per execution") ramLimit = flag.Int("ram-limit", 128, "number of wasm pages that can be used") loopTimeout = flag.Duration("loop-timeout", 30*time.Second, "idle time per loop") cwaEntrypoint = flag.String("cwa-entrypoint", "_start", "entrypoint into CWA programs") ) func main() { flagenv.Parse() flag.Parse() log.SetFlags(log.LstdFlags | log.Lshortfile) sh := shell.NewShell(*ipfsURL) nc, err := nats.Connect(*natsURL) if err != nil { log.Fatal(err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() c := make(chan os.Signal, 2) signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { <-c cancel() log.Println("press ^C again to kill all of this") <-c os.Exit(0) }() var wg sync.WaitGroup wg.Add(*workerCount) for range make([]struct{}, *workerCount) { go waitForNewWASM(ctx, &wg, sh, nc) } log.Printf("waiting for work on %s", internal.TopicName) wg.Wait() } func waitForNewWASM(ctx context.Context, wg *sync.WaitGroup, sh *shell.Shell, nc *nats.Conn) { defer wg.Done() subsc, err := nc.QueueSubscribeSync(internal.TopicName, "workers") if err != nil { return } for { select { case <-ctx.Done(): return default: } msg, err := subsc.NextMsg(*loopTimeout) if err != nil { if err == nats.ErrTimeout { continue } log.Printf("error getting message: %v", err) return } data := msg.Data var er internal.ExecRequest err = json.Unmarshal(data, &er) if err != nil { log.Printf("invalid message %s: %v", string(data), err) continue } stdin := bytes.NewBuffer(er.Data) stdout := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil) logBuf := bytes.NewBuffer(nil) bin, err := sh.Cat(er.WASMCID) if err != nil { log.Printf("can't get wasm: %v", err) continue } wasmBin, err := ioutil.ReadAll(bin) bin.Close() if err != nil { log.Printf("can't get wasm binary %s: %v", er.WASMCID, err) continue } c := executor.Config{ VMConfig: exec.VMConfig{ GasLimit: uint64(*gasLimit), ReturnOnGasLimitExceeded: true, MaxMemoryPages: *ramLimit, }, Name: er.Name, FuncName: *cwaEntrypoint, Env: er.Env, Binary: wasmBin, Stdin: stdin, Stdout: stdout, Stderr: stderr, LogSink: logBuf, } pol, err := policy.Parse(er.Name+".policy", er.Policy) if err != nil { log.Printf("can't get policy: %v", err) continue } c.Policy = &pol result, err := executor.Run(c) if err != nil { fmt.Fprintln(logBuf, "can't run binary:", err) log.Printf("can't run binary: %v", err) } arc := txtar.Archive{ Comment: []byte(fmt.Sprintf("%s: execution of %s at %s", er.UUID, er.Name, result.StartTime.Format(time.RFC3339))), Files: []txtar.File{ { Name: "logs.txt", Data: logBuf.Bytes(), }, { Name: "stdin.txt", Data: er.Data, }, { Name: "stdout.txt", Data: stdout.Bytes(), }, { Name: "stderr.txt", Data: stderr.Bytes(), }, result.StatsJSONFile(), result.StatsFile(), { Name: "wasm.cid", Data: []byte(er.WASMCID), }, }, } interm := bytes.NewBuffer(txtar.Format(&arc)) cid, err := sh.Add(interm) if err != nil { log.Printf("can't save execution results: %v", err) continue } log.Printf("wasm module %s execution finished, logs at %s", er.WASMCID, cid) resp := internal.ExecResponse{ WASMCID: er.WASMCID, LogBundleCID: cid, Logs: arc, UUID: er.UUID, } repMsg, _ := json.Marshal(resp) msg.Respond(repMsg) } }