mirror of
https://github.com/SecurityBrewery/catalyst.git
synced 2026-02-09 23:03:51 +01:00
@@ -21,7 +21,6 @@ type busService struct {
|
||||
}
|
||||
|
||||
func New(apiURL, apikey, network string, catalystBus *bus.Bus, db *database.Database) error {
|
||||
|
||||
h := &busService{db: db, apiURL: apiURL, apiKey: apikey, network: network, catalystBus: catalystBus}
|
||||
|
||||
if err := catalystBus.SubscribeRequest(h.logRequest); err != nil {
|
||||
@@ -40,6 +39,7 @@ func New(apiURL, apikey, network string, catalystBus *bus.Bus, db *database.Data
|
||||
func busContext() context.Context {
|
||||
// TODO: change roles?
|
||||
bot := &model.UserResponse{ID: "bot", Roles: []string{role.Admin}}
|
||||
|
||||
return busdb.UserContext(context.Background(), bot)
|
||||
}
|
||||
|
||||
|
||||
@@ -59,13 +59,15 @@ func pullImage(ctx context.Context, cli *client.Client, image string) (string, e
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
_, err = io.Copy(buf, reader)
|
||||
|
||||
return buf.String(), err
|
||||
}
|
||||
|
||||
func copyFile(ctx context.Context, cli *client.Client, path string, contentString string, id string) error {
|
||||
tarBuf := &bytes.Buffer{}
|
||||
tw := tar.NewWriter(tarBuf)
|
||||
if err := tw.WriteHeader(&tar.Header{Name: path, Mode: 0755, Size: int64(len(contentString))}); err != nil {
|
||||
header := &tar.Header{Name: path, Mode: 0o755, Size: int64(len(contentString))}
|
||||
if err := tw.WriteHeader(header); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -90,7 +92,12 @@ func runDocker(ctx context.Context, jobID, containerID string, db *database.Data
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
defer cli.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{Force: true})
|
||||
defer func(cli *client.Client, ctx context.Context, containerID string, options types.ContainerRemoveOptions) {
|
||||
err := cli.ContainerRemove(ctx, containerID, options)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
}
|
||||
}(cli, ctx, containerID, types.ContainerRemoveOptions{Force: true})
|
||||
|
||||
if err := cli.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil {
|
||||
return nil, nil, err
|
||||
@@ -123,13 +130,16 @@ func streamStdErr(ctx context.Context, cli *client.Client, jobID, containerID st
|
||||
err := scanLines(ctx, jobID, containerLogs, stderrBuf, db)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
||||
return
|
||||
}
|
||||
if err := containerLogs.Close(); err != nil {
|
||||
log.Println(err)
|
||||
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return stderrBuf, nil
|
||||
}
|
||||
|
||||
@@ -139,24 +149,28 @@ func scanLines(ctx context.Context, jobID string, input io.ReadCloser, output io
|
||||
_, err := stdcopy.StdCopy(w, w, input)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
||||
return
|
||||
}
|
||||
if err := w.Close(); err != nil {
|
||||
log.Println(err)
|
||||
|
||||
return
|
||||
}
|
||||
}()
|
||||
s := bufio.NewScanner(r)
|
||||
for s.Scan() {
|
||||
b := s.Bytes()
|
||||
output.Write(b)
|
||||
output.Write([]byte("\n"))
|
||||
_, _ = output.Write(b)
|
||||
_, _ = output.Write([]byte("\n"))
|
||||
|
||||
if err := db.JobLogAppend(ctx, jobID, string(b)+"\n"); err != nil {
|
||||
log.Println(err)
|
||||
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return s.Err()
|
||||
}
|
||||
|
||||
@@ -172,6 +186,7 @@ func waitForContainer(ctx context.Context, cli *client.Client, containerID strin
|
||||
return fmt.Errorf("container returned status code %d: stderr: %s", exitStatus.StatusCode, stderrBuf.String())
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -19,17 +19,20 @@ func (h *busService) handleJob(automationMsg *bus.JobMsg) {
|
||||
})
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
automation, err := h.db.AutomationGet(ctx, automationMsg.Automation)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if automation.Script == "" {
|
||||
log.Println("automation is empty")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -39,11 +42,17 @@ func (h *busService) handleJob(automationMsg *bus.JobMsg) {
|
||||
automationMsg.Message.Secrets["catalyst_apikey"] = h.apiKey
|
||||
automationMsg.Message.Secrets["catalyst_apiurl"] = h.apiURL
|
||||
|
||||
scriptMessage, _ := json.Marshal(automationMsg.Message)
|
||||
scriptMessage, err := json.Marshal(automationMsg.Message)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
containerID, logs, err := createContainer(ctx, automation.Image, automation.Script, string(scriptMessage), h.network)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@@ -55,18 +64,19 @@ func (h *busService) handleJob(automationMsg *bus.JobMsg) {
|
||||
Status: job.Status,
|
||||
}); err != nil {
|
||||
log.Println(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
var result map[string]interface{}
|
||||
var result map[string]any
|
||||
|
||||
stdout, _, err := runDocker(ctx, automationMsg.ID, containerID, h.db)
|
||||
if err != nil {
|
||||
result = map[string]interface{}{"error": fmt.Sprintf("error running script %s %s", err, string(stdout))}
|
||||
result = map[string]any{"error": fmt.Sprintf("error running script %s %s", err, string(stdout))}
|
||||
} else {
|
||||
var data map[string]interface{}
|
||||
var data map[string]any
|
||||
if err := json.Unmarshal(stdout, &data); err != nil {
|
||||
result = map[string]interface{}{"error": string(stdout)}
|
||||
result = map[string]any{"error": string(stdout)}
|
||||
} else {
|
||||
result = data
|
||||
}
|
||||
@@ -78,6 +88,7 @@ func (h *busService) handleJob(automationMsg *bus.JobMsg) {
|
||||
|
||||
if err := h.db.JobComplete(ctx, automationMsg.ID, result); err != nil {
|
||||
log.Println(err)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user