package main import ( "bytes" "context" "encoding/json" "flag" "fmt" "io" "io/ioutil" "log" "os" "os/signal" "sync" "syscall" "time" shell "github.com/ipfs/go-ipfs-api" "github.com/perlin-network/life/compiler" "github.com/perlin-network/life/exec" "github.com/rogpeppe/go-internal/txtar" "tulpa.dev/within/wasmcloud/cmd/internal" "within.website/olin/abi/cwa" ) var ( ipfsURL = flag.String("ipfs-host", "localhost:5001", "IPFS host (must have pubsub experiment enabled)") workerCount = flag.Int("worker-count", 1, "number of wasm executor workers") ) func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) sh := shell.NewShell(*ipfsURL) subsc, err := sh.PubSubSubscribe(internal.TopicName) if err != nil { log.Fatal(err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() c := make(chan os.Signal, 2) signal.Notify(c, os.Interrupt, syscall.SIGTERM) go func() { <-c cancel() }() var wg sync.WaitGroup wg.Add(*workerCount) for range make([]struct{}, *workerCount) { go waitForNewWASM(ctx, &wg, sh, subsc) } log.Printf("waiting for work on %s", internal.TopicName) wg.Wait() } func waitForNewWASM(ctx context.Context, wg *sync.WaitGroup, sh *shell.Shell, subsc *shell.PubSubSubscription) { defer wg.Done() for { select { case <-ctx.Done(): return default: } msg, err := subsc.Next() if err != nil { log.Printf("error getting message: %v", err) return } data := msg.Data var er internal.ExecRequest err = json.Unmarshal(data, &er) if err != nil { log.Printf("invalid message %s: %v", string(data), err) continue } stdin := bytes.NewBuffer(er.Data) stdout := bytes.NewBuffer(nil) logBuf := bytes.NewBuffer(nil) bin, err := sh.Cat(er.WASMCID) if err != nil { log.Printf("can't get wasm: %v", err) continue } wasmBin, err := ioutil.ReadAll(bin) bin.Close() if err != nil { log.Printf("can't get wasm binary %s: %v", er.WASMCID, err) continue } p := cwa.NewProcess(er.WASMCID, nil, er.Env) p.Stdin = stdin p.Stdout = stdout p.Stderr = stdout p.Logger = log.New(io.MultiWriter(logBuf, os.Stdout), er.WASMCID+" ", log.LstdFlags) gp := &compiler.SimpleGasPolicy{GasPerInstruction: 1} vm, err := exec.NewVirtualMachine(wasmBin, exec.VMConfig{}, p, gp) if err != nil { p.Logger.Printf("[SYSTEM] can't create VM: %v", err) continue } main, ok := vm.GetFunctionExport("cwa_main") if !ok { p.Logger.Printf("[SYSTEM] can't get main function") continue } begin := time.Now() ret, err := vm.Run(main) if err != nil { p.Logger.Printf("error running main: %v", err) continue } dur := time.Since(begin) p.Logger.Printf("[SYSTEM] return status: %v", ret) arc := txtar.Archive{ Comment: []byte(fmt.Sprintf("%s: execution of %s at %s", er.UUID, er.WASMCID, begin.Format(time.RFC3339))), Files: []txtar.File{ { Name: "vmstats.txt", Data: []byte(fmt.Sprintf("execution time: %s\ngas used: %v\nsyscall count: %v", dur, vm.Gas, p.SyscallCount())), }, { Name: "stdout.txt", Data: stdout.Bytes(), }, { Name: "logs.txt", Data: logBuf.Bytes(), }, }, } interm := bytes.NewBuffer(txtar.Format(&arc)) cid, err := sh.Add(interm) if err != nil { log.Printf("can't save execution results: %v", err) continue } log.Printf("wasm module %s execution finished, logs at %s", er.WASMCID, cid) resp := internal.ExecResponse{ WASMCID: er.WASMCID, LogBundleCID: cid, Logs: arc, UUID: er.UUID, } repMsg, _ := json.Marshal(resp) sh.PubSubPublish(er.UUID, string(repMsg)) } }