package main import ( "bytes" "context" "encoding/json" "flag" "fmt" "io/ioutil" "log" "os" "os/signal" "sync" "syscall" "time" shell "github.com/ipfs/go-ipfs-api" "github.com/perlin-network/life/exec" "github.com/rogpeppe/go-internal/txtar" "tulpa.dev/within/wasmcloud/cmd/internal" "tulpa.dev/within/wasmcloud/executor" ) var ( ipfsURL = flag.String("ipfs-host", "localhost:5001", "IPFS host (must have pubsub experiment enabled)") workerCount = flag.Int("worker-count", 1, "number of wasm executor workers") gasLimit = flag.Int("gas-limit", 1048576, "number of wasm instructions per execution") ramLimit = flag.Int("ram-limit", 128, "number of wasm pages that can be used") ) func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) sh := shell.NewShell(*ipfsURL) subsc, err := sh.PubSubSubscribe(internal.TopicName) if err != nil { log.Fatal(err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() c := make(chan os.Signal, 2) signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { <-c cancel() log.Println("press ^C again to kill all of this") <-c os.Exit(0) }() var wg sync.WaitGroup wg.Add(*workerCount) for range make([]struct{}, *workerCount) { go waitForNewWASM(ctx, &wg, sh, subsc) } log.Printf("waiting for work on %s", internal.TopicName) wg.Wait() } func waitForNewWASM(ctx context.Context, wg *sync.WaitGroup, sh *shell.Shell, subsc *shell.PubSubSubscription) { defer wg.Done() for { select { case <-ctx.Done(): return default: } msg, err := subsc.Next() if err != nil { log.Printf("error getting message: %v", err) return } data := msg.Data var er internal.ExecRequest err = json.Unmarshal(data, &er) if err != nil { log.Printf("invalid message %s: %v", string(data), err) continue } stdin := bytes.NewBuffer(er.Data) stdout := bytes.NewBuffer(nil) stderr := bytes.NewBuffer(nil) logBuf := bytes.NewBuffer(nil) bin, err := sh.Cat(er.WASMCID) if err != nil { log.Printf("can't get wasm: %v", err) continue } wasmBin, err := ioutil.ReadAll(bin) bin.Close() if err != nil { log.Printf("can't get wasm binary %s: %v", er.WASMCID, err) continue } c := executor.Config{ VMConfig: exec.VMConfig{ GasLimit: uint64(*gasLimit), ReturnOnGasLimitExceeded: true, MaxMemoryPages: *ramLimit, }, Name: er.Name, FuncName: "cwa_main", Env: er.Env, Binary: wasmBin, Stdin: stdin, Stdout: stdout, Stderr: stderr, LogSink: logBuf, } result, err := executor.Run(c) if err != nil { log.Printf("can't run binary: %v", err) } arc := txtar.Archive{ Comment: []byte(fmt.Sprintf("%s: execution of %s at %s", er.UUID, er.WASMCID, result.StartTime.Format(time.RFC3339))), Files: []txtar.File{ { Name: "logs.txt", Data: logBuf.Bytes(), }, { Name: "stdout.txt", Data: stdout.Bytes(), }, { Name: "stderr.txt", Data: stderr.Bytes(), }, result.ToFile(), { Name: "wasm.cid", Data: []byte(er.WASMCID), }, }, } interm := bytes.NewBuffer(txtar.Format(&arc)) cid, err := sh.Add(interm) if err != nil { log.Printf("can't save execution results: %v", err) continue } log.Printf("wasm module %s execution finished, logs at %s", er.WASMCID, cid) resp := internal.ExecResponse{ WASMCID: er.WASMCID, LogBundleCID: cid, Logs: arc, UUID: er.UUID, } repMsg, _ := json.Marshal(resp) sh.PubSubPublish(er.UUID, string(repMsg)) } }