Improve bus (#3)

* Improve bus
* Add ticket log
This commit is contained in:
Jonas Plum
2021-12-27 19:08:07 +01:00
committed by GitHub
parent 1fade14ba5
commit b5dd0cfacd
50 changed files with 756 additions and 456 deletions

60
busservice/busservice.go Normal file
View File

@@ -0,0 +1,60 @@
package busservice
import (
"context"
"log"
"github.com/SecurityBrewery/catalyst/bus"
"github.com/SecurityBrewery/catalyst/database"
"github.com/SecurityBrewery/catalyst/database/busdb"
"github.com/SecurityBrewery/catalyst/generated/models"
"github.com/SecurityBrewery/catalyst/role"
"github.com/SecurityBrewery/catalyst/time"
)
type busService struct {
db *database.Database
apiURL string
apiKey string
catalystBus *bus.Bus
}
func New(apiurl, apikey string, catalystBus *bus.Bus, db *database.Database) error {
h := &busService{db: db, apiURL: apiurl, apiKey: apikey, catalystBus: catalystBus}
if err := catalystBus.SubscribeRequest(h.logRequest); err != nil {
return err
}
if err := catalystBus.SubscribeResult(h.handleResult); err != nil {
return err
}
if err := catalystBus.SubscribeJob(h.handleJob); err != nil {
return err
}
return nil
}
func busContext() context.Context {
// TODO: change roles?
bot := &models.UserResponse{ID: "bot", Roles: []string{role.Admin}}
return busdb.UserContext(context.Background(), bot)
}
func (h *busService) logRequest(msg *bus.RequestMsg) {
var logEntries []*models.LogEntry
for _, i := range msg.IDs {
logEntries = append(logEntries, &models.LogEntry{
Type: bus.ChannelRequest,
Reference: i.String(),
Creator: msg.User,
Message: msg.Function,
Created: time.Now().UTC(),
})
}
if err := h.db.LogBatchCreate(busContext(), logEntries); err != nil {
log.Println(err)
}
}

186
busservice/docker.go Normal file
View File

@@ -0,0 +1,186 @@
package busservice
import (
"archive/tar"
"bufio"
"bytes"
"context"
"fmt"
"io"
"log"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/stdcopy"
"github.com/SecurityBrewery/catalyst/database"
)
func createContainer(ctx context.Context, image, script, data string) (string, string, error) {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return "", "", err
}
logs, err := pullImage(ctx, cli, image)
if err != nil {
return "", logs, err
}
config := &container.Config{
Image: image, Cmd: []string{"/script", data}, WorkingDir: "/home",
AttachStderr: true, AttachStdout: true,
}
resp, err := cli.ContainerCreate(ctx, config, nil, nil, "")
if err != nil {
return "", logs, err
}
if err := copyFile(ctx, cli, "/script", script, resp.ID); err != nil {
return "", logs, err
}
return resp.ID, logs, nil
}
func pullImage(ctx context.Context, cli *client.Client, image string) (string, error) {
reader, err := cli.ImagePull(ctx, image, types.ImagePullOptions{})
if err != nil {
return "", err
}
defer reader.Close()
buf := &bytes.Buffer{}
_, err = io.Copy(buf, reader)
return buf.String(), err
}
func copyFile(ctx context.Context, cli *client.Client, path string, contentString string, id string) error {
tarBuf := &bytes.Buffer{}
tw := tar.NewWriter(tarBuf)
if err := tw.WriteHeader(&tar.Header{Name: path, Mode: 0755, Size: int64(len(contentString))}); err != nil {
return err
}
if _, err := tw.Write([]byte(contentString)); err != nil {
return err
}
if err := tw.Close(); err != nil {
return err
}
if err := cli.CopyToContainer(ctx, id, "/", tarBuf, types.CopyToContainerOptions{}); err != nil {
return err
}
return nil
}
func runDocker(ctx context.Context, jobID, containerID string, db *database.Database) (stdout []byte, stderr []byte, err error) {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return nil, nil, err
}
defer cli.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{Force: true})
if err := cli.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil {
return nil, nil, err
}
stderrBuf, err := streamStdErr(ctx, cli, jobID, containerID, db)
if err != nil {
return nil, nil, err
}
if err := waitForContainer(ctx, cli, containerID, stderrBuf); err != nil {
return nil, nil, err
}
output, err := getStdOut(ctx, cli, containerID)
if err != nil {
log.Println(err)
}
return output.Bytes(), stderrBuf.Bytes(), nil
}
func streamStdErr(ctx context.Context, cli *client.Client, jobID, containerID string, db *database.Database) (*bytes.Buffer, error) {
stderrBuf := &bytes.Buffer{}
containerLogs, err := cli.ContainerLogs(ctx, containerID, types.ContainerLogsOptions{ShowStderr: true, Follow: true})
if err != nil {
return nil, err
}
go func() {
err := scanLines(ctx, jobID, containerLogs, stderrBuf, db)
if err != nil {
log.Println(err)
return
}
if err := containerLogs.Close(); err != nil {
log.Println(err)
return
}
}()
return stderrBuf, nil
}
func scanLines(ctx context.Context, jobID string, input io.ReadCloser, output io.Writer, db *database.Database) error {
r, w := io.Pipe()
go func() {
_, err := stdcopy.StdCopy(w, w, input)
if err != nil {
log.Println(err)
return
}
if err := w.Close(); err != nil {
log.Println(err)
return
}
}()
s := bufio.NewScanner(r)
for s.Scan() {
b := s.Bytes()
output.Write(b)
output.Write([]byte("\n"))
if err := db.JobLogAppend(ctx, jobID, string(b)+"\n"); err != nil {
log.Println(err)
continue
}
}
return s.Err()
}
func waitForContainer(ctx context.Context, cli *client.Client, containerID string, stderrBuf *bytes.Buffer) error {
statusCh, errCh := cli.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
select {
case err := <-errCh:
if err != nil {
return err
}
case exitStatus := <-statusCh:
if exitStatus.StatusCode != 0 {
return fmt.Errorf("container returned status code %d: stderr: %s", exitStatus.StatusCode, stderrBuf.String())
}
}
return nil
}
func getStdOut(ctx context.Context, cli *client.Client, containerID string) (*bytes.Buffer, error) {
output := &bytes.Buffer{}
containerLogs, err := cli.ContainerLogs(ctx, containerID, types.ContainerLogsOptions{ShowStdout: true, Follow: true})
if err != nil {
return nil, err
}
defer containerLogs.Close()
_, err = stdcopy.StdCopy(output, output, containerLogs)
if err != nil {
return nil, err
}
return output, nil
}

113
busservice/job.go Normal file
View File

@@ -0,0 +1,113 @@
package busservice
import (
"encoding/json"
"fmt"
"log"
"github.com/SecurityBrewery/catalyst/bus"
"github.com/SecurityBrewery/catalyst/generated/models"
)
func (h *busService) handleJob(automationMsg *bus.JobMsg) {
ctx := busContext()
job, err := h.db.JobCreate(ctx, automationMsg.ID, &models.JobForm{
Automation: automationMsg.Automation,
Payload: automationMsg.Message.Payload,
Origin: automationMsg.Origin,
})
if err != nil {
log.Println(err)
return
}
automation, err := h.db.AutomationGet(ctx, automationMsg.Automation)
if err != nil {
log.Println(err)
return
}
if automation.Script == "" {
log.Println("automation is empty")
return
}
if automationMsg.Message.Secrets == nil {
automationMsg.Message.Secrets = map[string]string{}
}
automationMsg.Message.Secrets["catalyst_apikey"] = h.apiKey
automationMsg.Message.Secrets["catalyst_apiurl"] = h.apiURL
scriptMessage, _ := json.Marshal(automationMsg.Message)
containerID, logs, err := createContainer(ctx, automation.Image, automation.Script, string(scriptMessage))
if err != nil {
log.Println(err)
return
}
if _, err := h.db.JobUpdate(ctx, automationMsg.ID, &models.Job{
Automation: job.Automation,
Container: &containerID,
Origin: job.Origin,
Output: job.Output,
Log: &logs,
Payload: job.Payload,
Status: job.Status,
}); err != nil {
log.Println(err)
return
}
var result map[string]interface{}
stdout, _, err := runDocker(ctx, automationMsg.ID, containerID, h.db)
if err != nil {
result = map[string]interface{}{"error": fmt.Sprintf("error running script %s %s", err, string(stdout))}
} else {
var data map[string]interface{}
if err := json.Unmarshal(stdout, &data); err != nil {
result = map[string]interface{}{"error": string(stdout)}
} else {
result = data
}
}
if err := h.catalystBus.PublishResult(automationMsg.Automation, result, automationMsg.Origin); err != nil {
log.Println(err)
}
if err := h.db.JobComplete(ctx, automationMsg.ID, result); err != nil {
log.Println(err)
return
}
}
/*
func getAutomation(automationID string, config *Config) (*models.AutomationResponse, error) {
req, err := http.NewRequest(http.MethodGet, config.CatalystAPIUrl+"/automations/"+automationID, nil)
if err != nil {
return nil, err
}
req.Header.Set("PRIVATE-TOKEN", config.CatalystAPIKey)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var automation models.AutomationResponse
if err := json.Unmarshal(b, &automation); err != nil {
return nil, err
}
return &automation, nil
}
*/

35
busservice/result.go Normal file
View File

@@ -0,0 +1,35 @@
package busservice
import (
"log"
"github.com/SecurityBrewery/catalyst/bus"
"github.com/SecurityBrewery/catalyst/generated/models"
)
func (h *busService) handleResult(resultMsg *bus.ResultMsg) {
if resultMsg.Target != nil {
ctx := busContext()
switch {
case resultMsg.Target.TaskOrigin != nil:
if _, err := h.db.TaskComplete(
ctx,
resultMsg.Target.TaskOrigin.TicketId,
resultMsg.Target.TaskOrigin.PlaybookId,
resultMsg.Target.TaskOrigin.TaskId,
resultMsg.Data,
); err != nil {
log.Println(err)
}
case resultMsg.Target.ArtifactOrigin != nil:
enrichment := &models.EnrichmentForm{
Data: resultMsg.Data,
Name: resultMsg.Automation,
}
_, err := h.db.EnrichArtifact(ctx, resultMsg.Target.ArtifactOrigin.TicketId, resultMsg.Target.ArtifactOrigin.Artifact, enrichment)
if err != nil {
log.Println(err)
}
}
}
}