wasmcloud/cmd/wasmcloudd/handler.go

387 lines
7.8 KiB
Go

package main
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"
"github.com/go-interpreter/wagon/wasm"
"github.com/golang/snappy"
shell "github.com/ipfs/go-ipfs-api"
"github.com/rogpeppe/go-internal/txtar"
"tulpa.dev/within/wasmcloud/cmd/internal"
"within.website/ln"
"within.website/ln/opname"
"within.website/olin/namegen"
"within.website/olin/policy"
)
func deleteHandler(w http.ResponseWriter, r *http.Request, u *User) {
ctx := r.Context()
q := r.URL.Query()
name := q.Get("name")
if name == "" {
http.NotFound(w, r)
return
}
var hdlr Handler
err := db.Where("name = ?", name).First(&hdlr).Error
if err != nil {
ln.Error(ctx, err)
http.NotFound(w, r)
return
}
db.Delete(&hdlr)
w.WriteHeader(http.StatusNoContent)
}
func updateHandler(w http.ResponseWriter, r *http.Request, u *User) {
ctx := r.Context()
q := r.URL.Query()
name := q.Get("name")
if name == "" {
http.NotFound(w, r)
return
}
var hdlr Handler
err := db.Where("name = ? AND user_id = ?", name, u.ID).First(&hdlr).Error
if err != nil {
ln.Error(ctx, err)
http.NotFound(w, r)
return
}
data, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, 16*1024*1024))
if err != nil {
ln.Error(ctx, err)
http.Error(w, "invalid data", http.StatusBadRequest)
return
}
r.Body.Close()
var uHdlr internal.Handler
err = json.Unmarshal(data, &uHdlr)
if err != nil {
ln.Error(ctx, err)
http.Error(w, "not json", http.StatusBadRequest)
return
}
if len(uHdlr.Policy) != 0 {
err = validatePolicy(name, uHdlr.Policy)
if err != nil {
ln.Error(ctx, err)
http.Error(w, "policy validation failure", http.StatusBadRequest)
return
}
}
cid, err := uploadHandler(uHdlr)
if err != nil {
ln.Error(ctx, err)
http.Error(w, "wasm validation failure", http.StatusBadRequest)
return
}
hdlr.Policy = uHdlr.Policy
hdlr.Path = cid
if err := db.Save(&hdlr).Error; err != nil {
ln.Error(ctx, err)
http.Error(w, "database error, contact support", http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(hdlr)
}
func validatePolicy(name string, data []byte) error {
pol, err := policy.Parse(name+".policy", data)
if err != nil {
return err
}
return internal.ValidatePolicy(pol)
}
func uploadHandler(hdlr internal.Handler) (string, error) {
sh := shell.NewShell(*ipfsURL)
switch string(hdlr.ABI) {
case string(internal.CWA), string(internal.Dagger):
default:
return "", fmt.Errorf("unsupported ABI %s", hdlr.ABI)
}
buf := bytes.NewBuffer(hdlr.WASM)
_, err := wasm.DecodeModule(buf)
if err != nil {
return "", fmt.Errorf("can't decode module: %w", err)
}
cid, err := sh.Add(bytes.NewBuffer(hdlr.WASM))
if err != nil {
return "", fmt.Errorf("can't upload module to IPFS: %w", err)
}
return cid, nil
}
func createHandler(w http.ResponseWriter, r *http.Request, u *User) {
if !u.CanCreateHandlers {
http.Error(w, "you can't create handlers, contact support", http.StatusForbidden)
return
}
ctx := r.Context()
data, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, 16*1024*1024))
if err != nil {
ln.Error(ctx, err)
http.Error(w, "invalid data", http.StatusBadRequest)
return
}
r.Body.Close()
var hdlr internal.Handler
err = json.Unmarshal(data, &hdlr)
if err != nil {
ln.Error(ctx, err)
http.Error(w, "not json", http.StatusBadRequest)
return
}
n := namegen.Next()
if len(hdlr.Policy) == 0 {
hdlr.Policy = []byte(defaultPolicy)
}
err = validatePolicy(n, hdlr.Policy)
if err != nil {
ln.Error(ctx, err)
http.Error(w, "policy validation failure", http.StatusBadRequest)
return
}
cid, err := uploadHandler(hdlr)
if err != nil {
ln.Error(ctx, err)
http.Error(w, "wasm validation error", http.StatusBadRequest)
return
}
h := Handler{
Name: n,
User: *u,
UserID: u.ID,
Path: cid,
Policy: hdlr.Policy,
}
if err := db.Save(&h).Error; err != nil {
ln.Error(ctx, err)
http.Error(w, "can't save record", http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(h)
}
func listHandlers(w http.ResponseWriter, r *http.Request, u *User) {
ctx := r.Context()
ctx = opname.With(ctx, "read-handlers")
myDB := db
if q := r.URL.Query(); q.Get("even-deleted") == "true" {
myDB = db.Unscoped()
}
var hdlrs []Handler
err := myDB.Where("user_id = ?", u.ID).Find(&hdlrs).Error
if err != nil {
ln.Error(ctx, err)
http.Error(w, "can't read handlers", http.StatusInternalServerError)
return
}
json.NewEncoder(w).Encode(hdlrs)
}
func getLogs(w http.ResponseWriter, r *http.Request, u *User) {
ctx := r.Context()
q := r.URL.Query()
name := q.Get("name")
if name == "" {
http.NotFound(w, r)
return
}
var hdlr Handler
err := db.Where("name = ? AND user_id = ?", name, u.ID).First(&hdlr).Error
if err != nil {
ln.Error(ctx, err)
http.NotFound(w, r)
return
}
var elogs []ExecutionLog
err = db.Where("handler_id = ?", hdlr.ID).Find(&elogs).Error
if err != nil {
ln.Error(ctx, err)
http.Error(w, "handler id not found", http.StatusInternalServerError)
return
}
type resultLine struct {
ExecID string `json:"exec_id"`
Comment string `json:"comment"`
Logs map[string]string `json:"logs"`
}
var result []resultLine
for _, elog := range elogs {
decompressed, err := snappy.Decode(nil, elog.Data)
if err != nil {
ln.Error(ctx, err)
result = append(result, resultLine{
ExecID: elog.RunID,
Comment: "ERROR IN DECOMPRESSING LOG BUNDLE",
Logs: map[string]string{
"logs.txt": err.Error(),
},
})
continue
}
arc := txtar.Parse(decompressed)
logs := map[string]string{}
for _, file := range arc.Files {
logs[file.Name] = string(file.Data)
}
result = append(result, resultLine{
ExecID: elog.RunID,
Comment: string(arc.Comment),
Logs: logs,
})
}
json.NewEncoder(w).Encode(result)
}
func invokeHandlerSync(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
q := r.URL.Query()
name := q.Get("name")
if name == "" {
http.NotFound(w, r)
return
}
wholeBundle := q.Get("whole-bundle") == "true"
var hdlr Handler
err := db.Where("name = ?", name).First(&hdlr).Error
if err != nil {
ln.Error(ctx, err)
http.NotFound(w, r)
return
}
data, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, 1*1024*1024))
if err != nil {
ln.Error(ctx, err)
http.NotFound(w, r)
}
resp, err := runHandler(ctx, hdlr, 5*time.Minute, data)
if err != nil {
ln.Error(ctx, err)
return
}
logData := txtar.Format(&resp.Logs)
compressedLogs := snappy.Encode(nil, logData)
entry := ExecutionLog{
HandlerID: hdlr.ID,
RunID: resp.UUID,
Data: compressedLogs,
}
err = db.Save(&entry).Error
if err != nil {
ln.Error(ctx, err)
return
}
ln.Log(ctx, ln.Action("saving-logs"))
if wholeBundle {
w.Header().Set("Content-Type", "application/txtar")
w.Write(logData)
return
}
for _, file := range resp.Logs.Files {
if file.Name == "stdout.txt" {
w.Write(file.Data)
return
}
}
}
func invokeHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
q := r.URL.Query()
name := q.Get("name")
if name == "" {
http.NotFound(w, r)
return
}
var hdlr Handler
err := db.Where("name = ?", name).First(&hdlr).Error
if err != nil {
ln.Error(ctx, err)
http.NotFound(w, r)
return
}
data, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, 1*1024*1024))
if err != nil {
ln.Error(ctx, err)
http.NotFound(w, r)
}
go func() {
resp, err := runHandler(ctx, hdlr, 5*time.Minute, data)
if err != nil {
ln.Error(ctx, err)
return
}
data := txtar.Format(&resp.Logs)
compressedLogs := snappy.Encode(nil, data)
entry := ExecutionLog{
HandlerID: hdlr.ID,
RunID: resp.UUID,
Data: compressedLogs,
}
err = db.Save(&entry).Error
if err != nil {
ln.Error(ctx, err)
return
}
ln.Log(ctx, ln.Action("saving-logs"))
}()
}