cmd/wasmcloudd: synchronous execution
This commit is contained in:
parent
59d7356142
commit
04975b6a5e
|
@ -8,7 +8,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/go-interpreter/wagon/wasm"
|
||||
"github.com/google/uuid"
|
||||
shell "github.com/ipfs/go-ipfs-api"
|
||||
"github.com/rogpeppe/go-internal/txtar"
|
||||
"tulpa.dev/within/wasmcloud/cmd/internal"
|
||||
|
@ -143,7 +142,7 @@ func getLogs(w http.ResponseWriter, r *http.Request, u *User) {
|
|||
json.NewEncoder(w).Encode(result)
|
||||
}
|
||||
|
||||
func invokeHandler(w http.ResponseWriter, r *http.Request, u *User) {
|
||||
func invokeHandlerSync(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
q := r.URL.Query()
|
||||
name := q.Get("name")
|
||||
|
@ -166,48 +165,69 @@ func invokeHandler(w http.ResponseWriter, r *http.Request, u *User) {
|
|||
http.NotFound(w, r)
|
||||
}
|
||||
|
||||
execID := uuid.New().String()
|
||||
|
||||
er := internal.ExecRequest{
|
||||
WASMCID: hdlr.Path,
|
||||
Name: hdlr.Name,
|
||||
Data: data,
|
||||
Env: map[string]string{
|
||||
"RUN_ID": execID,
|
||||
},
|
||||
UUID: execID,
|
||||
resp, err := runHandler(ctx, hdlr, 5*time.Minute, data)
|
||||
if err != nil {
|
||||
ln.Error(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
encData, err := json.Marshal(er)
|
||||
logData := txtar.Format(&resp.Logs)
|
||||
entry := ExecutionLog{
|
||||
HandlerID: hdlr.ID,
|
||||
RunID: resp.UUID,
|
||||
Data: logData,
|
||||
}
|
||||
|
||||
err = db.Save(&entry).Error
|
||||
if err != nil {
|
||||
ln.Error(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
ln.Log(ctx, ln.Action("saving-logs"))
|
||||
|
||||
for _, file := range resp.Logs.Files {
|
||||
if file.Name == "stdout.txt" {
|
||||
w.Write(file.Data)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func invokeHandler(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
q := r.URL.Query()
|
||||
name := q.Get("name")
|
||||
if name == "" {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
var hdlr Handler
|
||||
err := db.Where("name = ?", name).First(&hdlr).Error
|
||||
if err != nil {
|
||||
ln.Error(ctx, err)
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
data, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, 1*1024*1024))
|
||||
if err != nil {
|
||||
ln.Error(ctx, err)
|
||||
http.NotFound(w, r)
|
||||
}
|
||||
|
||||
go func() {
|
||||
f := ln.F{
|
||||
"name": name,
|
||||
"run_id": execID,
|
||||
}
|
||||
ln.Log(ctx, ln.Action("invoke-handler"), f)
|
||||
msg, err := nc.Request(internal.TopicName, encData, 5*time.Minute)
|
||||
if err != nil {
|
||||
ln.Error(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
var resp internal.ExecResponse
|
||||
err = json.Unmarshal(msg.Data, &resp)
|
||||
resp, err := runHandler(ctx, hdlr, 5*time.Minute, data)
|
||||
if err != nil {
|
||||
ln.Error(ctx, err)
|
||||
return
|
||||
}
|
||||
|
||||
data := txtar.Format(&resp.Logs)
|
||||
|
||||
entry := ExecutionLog{
|
||||
HandlerID: hdlr.ID,
|
||||
RunID: execID,
|
||||
RunID: resp.UUID,
|
||||
Data: data,
|
||||
}
|
||||
|
||||
|
@ -217,6 +237,6 @@ func invokeHandler(w http.ResponseWriter, r *http.Request, u *User) {
|
|||
return
|
||||
}
|
||||
|
||||
ln.Log(ctx, ln.Action("saving-logs"), f)
|
||||
ln.Log(ctx, ln.Action("saving-logs"))
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"tulpa.dev/within/wasmcloud/cmd/internal"
|
||||
"within.website/ln"
|
||||
"within.website/ln/opname"
|
||||
)
|
||||
|
||||
func runHandler(ctx context.Context, hdlr Handler, timeout time.Duration, message []byte) (*internal.ExecResponse, error) {
|
||||
ctx = opname.With(ctx, "invoke")
|
||||
execID := uuid.New().String()
|
||||
er := internal.ExecRequest{
|
||||
WASMCID: hdlr.Path,
|
||||
Name: hdlr.Name,
|
||||
Data: message,
|
||||
Env: map[string]string{
|
||||
"RUN_ID": execID,
|
||||
},
|
||||
UUID: execID,
|
||||
}
|
||||
|
||||
encData, err := json.Marshal(er)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't encode job: %w", err)
|
||||
}
|
||||
|
||||
f := ln.F{
|
||||
"name": hdlr.Name,
|
||||
"run_id": execID,
|
||||
}
|
||||
ln.Log(ctx, ln.Info("starting run"), f)
|
||||
msg, err := nc.Request(internal.TopicName, encData, timeout)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't submit job: %w", err)
|
||||
}
|
||||
|
||||
var resp internal.ExecResponse
|
||||
err = json.Unmarshal(msg.Data, &resp)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't unmarshal response: %w", err)
|
||||
}
|
||||
|
||||
return &resp, nil
|
||||
}
|
|
@ -62,7 +62,8 @@ func main() {
|
|||
rtr.HandleFunc("/api/whoami", makeHandler(true, apiWhoami))
|
||||
|
||||
// invocation
|
||||
rtr.HandleFunc("/invoke", makeHandler(false, invokeHandler)).Methods(http.MethodPost)
|
||||
rtr.HandleFunc("/invoke", invokeHandler).Methods(http.MethodPost)
|
||||
rtr.HandleFunc("/invoke/sync", invokeHandlerSync).Methods(http.MethodPost)
|
||||
|
||||
rtr.PathPrefix("/static/").Handler(http.FileServer(http.Dir(".")))
|
||||
|
||||
|
|
Loading…
Reference in New Issue