mercy/cmd/kronos/main.go

141 lines
2.8 KiB
Go

package main
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strconv"
"time"
"git.xeserv.us/xena/mercy/internal/common"
"git.xeserv.us/xena/mercy/internal/database"
"github.com/Xe/gorqlite"
"github.com/Xe/uuid"
"github.com/caarlos0/env"
nats "github.com/nats-io/go-nats"
"github.com/robfig/cron"
)
type config struct {
NatsURL string `env:"NATS_URL,required"`
DatabaseURL string `env:"DATABASE_URL,required"`
Debug bool `env:"DEBUG"`
}
func main() {
var cfg config
err := env.Parse(&cfg)
if err != nil {
log.Fatal(err)
}
nc, err := common.NatsConnect(cfg.NatsURL)
if err != nil {
log.Fatal(err)
}
_ = nc
log.Printf("connecting to %s", cfg.DatabaseURL)
db, err := common.RQLiteConnect(cfg.DatabaseURL)
if err != nil {
log.Fatal(err)
}
if cfg.Debug {
gorqlite.TraceOn(os.Stderr)
}
chks := database.NewChecks(db)
err = chks.Migrate()
if err != nil {
log.Fatal(err)
}
resuls := database.NewResults(db)
err = resuls.Migrate()
if err != nil {
log.Fatal(err)
}
nc.QueueSubscribe("results", "kronos", func(m *nats.Msg) {
var cr common.CheckResult
err := json.Unmarshal(m.Data, &cr)
if err != nil {
log.Printf("results: error when decoding json: %v", err)
return
}
err = resuls.InsResult(cr)
if err != nil {
log.Printf("results: error when inserting result record: %v", err)
return
}
cid, err := strconv.ParseInt(cr.Preamble.CheckID, 10, 64)
if err != nil {
log.Printf("results: %s is not a number: %v", cr.Preamble.CheckID, err)
return
}
chk, err := chks.GetCheck(cid)
if err != nil {
log.Printf("results: can't get check: %v", err)
return
}
_, err = http.Post(chk.ReportWebhook, "application/json", bytes.NewBuffer(m.Data))
if err != nil {
log.Printf("results: http.Post(%q): %v", chk.ReportWebhook, err)
return
}
err = chks.UpdateLastResult(cid, cr.Result)
if err != nil {
log.Printf("results: updating last check result for cid %d: %v", cid, err)
return
}
})
c := cron.New()
c.AddFunc("@every 1m", func() {
log.Printf("scheduling checks")
cl, err := chks.GetAllChecks()
if err != nil {
log.Printf("getAllChecks: %v", err)
return
}
for _, chk := range cl {
go func() {
chr := common.HTTPCheckRequest{
Preamble: common.Preamble{
CustomerID: chk.CustomerID,
CheckID: fmt.Sprintf("%d", chk.ID),
RunID: uuid.New(),
},
URL: chk.URI,
DegradedThreshold: 500 * time.Millisecond,
FailThreshold: 5 * time.Second,
}
data, err := json.Marshal(&chr)
if err != nil {
log.Printf("error in json-encoding check request %#v: %v", chr, err)
return
}
err = nc.Publish("tasks:http", data)
if err != nil {
log.Printf("error in sending nats request checkID: %d: %v", chk.ID, err)
}
}()
}
})
c.Start()
for {
select {}
}
}