diff --git a/bus/bus.go b/bus/bus.go
index 259e9e7..ff707be 100644
--- a/bus/bus.go
+++ b/bus/bus.go
@@ -1,70 +1,69 @@
package bus
import (
- "encoding/json"
- "log"
+ "github.com/arangodb/go-driver"
- emitter "github.com/emitter-io/go/v2"
+ "github.com/SecurityBrewery/catalyst/generated/model"
)
+type ResultMsg struct {
+ Automation string `json:"automation"`
+ Data map[string]any `json:"data,omitempty"`
+ Target *model.Origin `json:"target"`
+}
+
+type RequestMsg struct {
+ IDs []driver.DocumentID `json:"ids"`
+ Function string `json:"function"`
+ User string `json:"user"`
+}
+
+type JobMsg struct {
+ ID string `json:"id"`
+ Automation string `json:"automation"`
+ Origin *model.Origin `json:"origin"`
+ Message *model.Message `json:"message"`
+}
+
+type DatabaseUpdateType string
+
+const (
+ DatabaseEntryRead DatabaseUpdateType = "read"
+ DatabaseEntryCreated DatabaseUpdateType = "created"
+ DatabaseEntryUpdated DatabaseUpdateType = "updated"
+)
+
+type DatabaseUpdateMsg struct {
+ IDs []driver.DocumentID `json:"ids"`
+ Type DatabaseUpdateType `json:"type"`
+}
+
type Bus struct {
- config *Config
- client *emitter.Client
+ ResultChannel *Channel[*ResultMsg]
+ RequestChannel *Channel[*RequestMsg]
+ JobChannel *Channel[*JobMsg]
+ DatabaseChannel *Channel[*DatabaseUpdateMsg]
}
-type Config struct {
- Host string
- Key string
- databaseUpdateBusKey string
- jobBusKey string
- resultBusKey string
- requestKey string
- APIUrl string
+func New() *Bus {
+ return &Bus{
+ ResultChannel: &Channel[*ResultMsg]{},
+ RequestChannel: &Channel[*RequestMsg]{},
+ JobChannel: &Channel[*JobMsg]{},
+ DatabaseChannel: &Channel[*DatabaseUpdateMsg]{},
+ }
}
-func New(c *Config) (*Bus, error) {
- client, err := emitter.Connect(c.Host, func(_ *emitter.Client, msg emitter.Message) {
- log.Printf("received: '%s' topic: '%s'\n", msg.Payload(), msg.Topic())
- })
- if err != nil {
- return nil, err
- }
-
- c.databaseUpdateBusKey, err = client.GenerateKey(c.Key, channelDatabaseUpdate+"/", "rwls", 0)
- if err != nil {
- return nil, err
- }
- c.jobBusKey, err = client.GenerateKey(c.Key, channelJob+"/", "rwls", 0)
- if err != nil {
- return nil, err
- }
- c.resultBusKey, err = client.GenerateKey(c.Key, channelResult+"/", "rwls", 0)
- if err != nil {
- return nil, err
- }
- c.requestKey, err = client.GenerateKey(c.Key, ChannelRequest+"/", "rwls", 0)
- if err != nil {
- return nil, err
- }
-
- return &Bus{config: c, client: client}, err
+type Channel[T any] struct {
+ Subscriber []func(T)
}
-func (b *Bus) jsonPublish(msg any, channel, key string) error {
- payload, err := json.Marshal(msg)
- if err != nil {
- return err
+func (c *Channel[T]) Publish(msg T) {
+ for _, s := range c.Subscriber {
+ go s(msg)
}
-
- return b.client.Publish(key, channel, payload)
}
-func (b *Bus) safeSubscribe(key, channel string, handler func(c *emitter.Client, m emitter.Message)) error {
- defer func() {
- if r := recover(); r != nil {
- log.Printf("Recovered %s in channel %s\n", r, channel)
- }
- }()
-
- return b.client.Subscribe(key, channel, handler)
+func (c *Channel[T]) Subscribe(handler func(T)) {
+ c.Subscriber = append(c.Subscriber, handler)
}
diff --git a/bus/databaseupdate.go b/bus/databaseupdate.go
deleted file mode 100644
index b37cdbf..0000000
--- a/bus/databaseupdate.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package bus
-
-import (
- "encoding/json"
- "log"
-
- "github.com/arangodb/go-driver"
- emitter "github.com/emitter-io/go/v2"
-)
-
-const channelDatabaseUpdate = "databaseupdate"
-
-type DatabaseUpdateType string
-
-const (
- DatabaseEntryRead DatabaseUpdateType = "read"
- DatabaseEntryCreated DatabaseUpdateType = "created"
- DatabaseEntryUpdated DatabaseUpdateType = "updated"
-)
-
-type DatabaseUpdateMsg struct {
- IDs []driver.DocumentID `json:"ids"`
- Type DatabaseUpdateType `json:"type"`
-}
-
-func (b *Bus) PublishDatabaseUpdate(ids []driver.DocumentID, databaseUpdateType DatabaseUpdateType) error {
- return b.jsonPublish(&DatabaseUpdateMsg{
- IDs: ids,
- Type: databaseUpdateType,
- }, channelDatabaseUpdate, b.config.databaseUpdateBusKey)
-}
-
-func (b *Bus) SubscribeDatabaseUpdate(f func(msg *DatabaseUpdateMsg)) error {
- return b.safeSubscribe(b.config.databaseUpdateBusKey, channelDatabaseUpdate, func(c *emitter.Client, m emitter.Message) {
- var msg DatabaseUpdateMsg
- if err := json.Unmarshal(m.Payload(), &msg); err != nil {
- log.Println(err)
-
- return
- }
- go f(&msg)
- })
-}
diff --git a/bus/job.go b/bus/job.go
deleted file mode 100644
index 8de5e35..0000000
--- a/bus/job.go
+++ /dev/null
@@ -1,43 +0,0 @@
-package bus
-
-import (
- "encoding/json"
- "log"
-
- emitter "github.com/emitter-io/go/v2"
-
- "github.com/SecurityBrewery/catalyst/generated/model"
-)
-
-const channelJob = "job"
-
-type JobMsg struct {
- ID string `json:"id"`
- Automation string `json:"automation"`
- Origin *model.Origin `json:"origin"`
- Message *model.Message `json:"message"`
-}
-
-func (b *Bus) PublishJob(id, automation string, payload any, context *model.Context, origin *model.Origin) error {
- return b.jsonPublish(&JobMsg{
- ID: id,
- Automation: automation,
- Origin: origin,
- Message: &model.Message{
- Context: context,
- Payload: payload,
- },
- }, channelJob, b.config.jobBusKey)
-}
-
-func (b *Bus) SubscribeJob(f func(msg *JobMsg)) error {
- return b.safeSubscribe(b.config.jobBusKey, channelJob, func(c *emitter.Client, m emitter.Message) {
- var msg JobMsg
- if err := json.Unmarshal(m.Payload(), &msg); err != nil {
- log.Println(err)
-
- return
- }
- go f(&msg)
- })
-}
diff --git a/bus/request.go b/bus/request.go
deleted file mode 100644
index 04496fd..0000000
--- a/bus/request.go
+++ /dev/null
@@ -1,37 +0,0 @@
-package bus
-
-import (
- "encoding/json"
- "log"
-
- "github.com/arangodb/go-driver"
- emitter "github.com/emitter-io/go/v2"
-)
-
-const ChannelRequest = "request"
-
-type RequestMsg struct {
- IDs []driver.DocumentID `json:"ids"`
- Function string `json:"function"`
- User string `json:"user"`
-}
-
-func (b *Bus) PublishRequest(user, f string, ids []driver.DocumentID) error {
- return b.jsonPublish(&RequestMsg{
- User: user,
- Function: f,
- IDs: ids,
- }, ChannelRequest, b.config.requestKey)
-}
-
-func (b *Bus) SubscribeRequest(f func(msg *RequestMsg)) error {
- return b.safeSubscribe(b.config.requestKey, ChannelRequest, func(c *emitter.Client, m emitter.Message) {
- msg := &RequestMsg{}
- if err := json.Unmarshal(m.Payload(), msg); err != nil {
- log.Println(err)
-
- return
- }
- go f(msg)
- })
-}
diff --git a/bus/result.go b/bus/result.go
deleted file mode 100644
index 6a0e23e..0000000
--- a/bus/result.go
+++ /dev/null
@@ -1,34 +0,0 @@
-package bus
-
-import (
- "encoding/json"
- "log"
-
- emitter "github.com/emitter-io/go/v2"
-
- "github.com/SecurityBrewery/catalyst/generated/model"
-)
-
-const channelResult = "result"
-
-type ResultMsg struct {
- Automation string `json:"automation"`
- Data map[string]any `json:"data,omitempty"`
- Target *model.Origin `json:"target"`
-}
-
-func (b *Bus) PublishResult(automation string, data map[string]any, target *model.Origin) error {
- return b.jsonPublish(&ResultMsg{Automation: automation, Data: data, Target: target}, channelResult, b.config.resultBusKey)
-}
-
-func (b *Bus) SubscribeResult(f func(msg *ResultMsg)) error {
- return b.safeSubscribe(b.config.resultBusKey, channelResult, func(c *emitter.Client, m emitter.Message) {
- msg := &ResultMsg{}
- if err := json.Unmarshal(m.Payload(), msg); err != nil {
- log.Println(err)
-
- return
- }
- go f(msg)
- })
-}
diff --git a/busservice/busservice.go b/busservice/busservice.go
index b421c56..4fc0af0 100644
--- a/busservice/busservice.go
+++ b/busservice/busservice.go
@@ -20,20 +20,12 @@ type busService struct {
network string
}
-func New(apiURL, apikey, network string, catalystBus *bus.Bus, db *database.Database) error {
+func New(apiURL, apikey, network string, catalystBus *bus.Bus, db *database.Database) {
h := &busService{db: db, apiURL: apiURL, apiKey: apikey, network: network, catalystBus: catalystBus}
- if err := catalystBus.SubscribeRequest(h.logRequest); err != nil {
- return err
- }
- if err := catalystBus.SubscribeResult(h.handleResult); err != nil {
- return err
- }
- if err := catalystBus.SubscribeJob(h.handleJob); err != nil {
- return err
- }
-
- return nil
+ catalystBus.RequestChannel.Subscribe(h.logRequest)
+ catalystBus.ResultChannel.Subscribe(h.handleResult)
+ catalystBus.JobChannel.Subscribe(h.handleJob)
}
func busContext() context.Context {
@@ -47,7 +39,7 @@ func (h *busService) logRequest(msg *bus.RequestMsg) {
var logEntries []*model.LogEntry
for _, i := range msg.IDs {
logEntries = append(logEntries, &model.LogEntry{
- Type: bus.ChannelRequest,
+ Type: "request",
Reference: i.String(),
Creator: msg.User,
Message: msg.Function,
diff --git a/busservice/job.go b/busservice/job.go
index f15a835..b7661d5 100644
--- a/busservice/job.go
+++ b/busservice/job.go
@@ -82,9 +82,7 @@ func (h *busService) handleJob(automationMsg *bus.JobMsg) {
}
}
- if err := h.catalystBus.PublishResult(automationMsg.Automation, result, automationMsg.Origin); err != nil {
- log.Println(err)
- }
+ h.catalystBus.ResultChannel.Publish(&bus.ResultMsg{Automation: automationMsg.Automation, Data: result, Target: automationMsg.Origin})
if err := h.db.JobComplete(ctx, automationMsg.ID, result); err != nil {
log.Println(err)
diff --git a/cmd/cmd.go b/cmd/cmd.go
index 7082108..eed7532 100644
--- a/cmd/cmd.go
+++ b/cmd/cmd.go
@@ -8,7 +8,6 @@ import (
"golang.org/x/oauth2"
"github.com/SecurityBrewery/catalyst"
- "github.com/SecurityBrewery/catalyst/bus"
"github.com/SecurityBrewery/catalyst/database"
"github.com/SecurityBrewery/catalyst/role"
"github.com/SecurityBrewery/catalyst/storage"
@@ -41,9 +40,6 @@ type CLI struct {
S3User string `env:"S3_USER" default:"minio" name:"s3-user"`
S3Password string `env:"S3_PASSWORD" required:"" name:"s3-password"`
- EmitterIOHost string `env:"EMITTER_IO_HOST" default:"tcp://emitter:8080"`
- EmitterIORKey string `env:"EMITTER_IO_KEY" required:""`
-
InitialAPIKey string `env:"INITIAL_API_KEY"`
}
@@ -71,6 +67,7 @@ func MapConfig(cli CLI) (*catalyst.Config, error) {
Storage: &storage.Config{Host: cli.S3Host, User: cli.S3User, Password: cli.S3Password},
Secret: []byte(cli.Secret),
ExternalAddress: cli.ExternalAddress,
+ InternalAddress: cli.CatalystAddress,
Auth: &catalyst.AuthConfig{
OIDCIssuer: cli.OIDCIssuer,
OAuth2: &oauth2.Config{ClientID: cli.OIDCClientID, ClientSecret: cli.OIDCClientSecret, RedirectURL: cli.ExternalAddress + "/callback", Scopes: scopes},
@@ -81,7 +78,6 @@ func MapConfig(cli CLI) (*catalyst.Config, error) {
AuthDefaultRoles: roles,
AuthAdminUsers: cli.AuthAdminUsers,
},
- Bus: &bus.Config{Host: cli.EmitterIOHost, Key: cli.EmitterIORKey, APIUrl: cli.CatalystAddress + "/api"},
InitialAPIKey: cli.InitialAPIKey,
}
diff --git a/database/busdb/busdb.go b/database/busdb/busdb.go
index 3e8c988..df36e59 100644
--- a/database/busdb/busdb.go
+++ b/database/busdb/busdb.go
@@ -55,9 +55,7 @@ func (db *BusDatabase) Query(ctx context.Context, query string, vars map[string]
switch {
case operation.Type == bus.DatabaseEntryCreated, operation.Type == bus.DatabaseEntryUpdated:
- if err := db.bus.PublishDatabaseUpdate(operation.Ids, operation.Type); err != nil {
- return nil, nil, err
- }
+ db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: operation.Ids, Type: operation.Type})
}
return cur, logs, err
@@ -92,10 +90,7 @@ func (c *Collection[T]) CreateDocument(ctx, newctx context.Context, key string,
return meta, err
}
- err = c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryCreated)
- if err != nil {
- return meta, err
- }
+ c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryCreated})
return meta, nil
}
@@ -108,10 +103,7 @@ func (c *Collection[T]) CreateEdge(ctx, newctx context.Context, edge *driver.Edg
return meta, err
}
- err = c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryCreated)
- if err != nil {
- return meta, err
- }
+ c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryCreated})
return meta, nil
}
@@ -132,10 +124,7 @@ func (c *Collection[T]) CreateEdges(ctx context.Context, edges []*driver.EdgeDoc
ids = append(ids, meta.ID)
}
- err = c.db.bus.PublishDatabaseUpdate(ids, bus.DatabaseEntryCreated)
- if err != nil {
- return metas, err
- }
+ c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: ids, Type: bus.DatabaseEntryCreated})
return metas, nil
}
@@ -162,7 +151,9 @@ func (c *Collection[T]) UpdateDocument(ctx context.Context, key string, update a
return meta, err
}
- return meta, c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryUpdated)
+ c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryUpdated})
+
+ return meta, nil
}
func (c *Collection[T]) ReplaceDocument(ctx context.Context, key string, document *T) (meta driver.DocumentMeta, err error) {
@@ -173,7 +164,9 @@ func (c *Collection[T]) ReplaceDocument(ctx context.Context, key string, documen
return meta, err
}
- return meta, c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryUpdated)
+ c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryUpdated})
+
+ return meta, nil
}
func (c *Collection[T]) RemoveDocument(ctx context.Context, formatInt string) (meta driver.DocumentMeta, err error) {
diff --git a/database/busdb/log.go b/database/busdb/log.go
index f4f1327..0af9645 100644
--- a/database/busdb/log.go
+++ b/database/busdb/log.go
@@ -3,7 +3,6 @@ package busdb
import (
"context"
"errors"
- "log"
"strings"
"github.com/arangodb/go-driver"
@@ -46,12 +45,10 @@ func (db *BusDatabase) LogBatchCreate(ctx context.Context, logentries []*model.L
}
}
if ids != nil {
- go func() {
- err := db.bus.PublishDatabaseUpdate(ids, bus.DatabaseEntryCreated)
- if err != nil {
- log.Println(err)
- }
- }()
+ go db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{
+ IDs: ids,
+ Type: bus.DatabaseEntryCreated,
+ })
}
_, errs, err := db.logCollection.CreateDocuments(ctx, logentries)
diff --git a/database/job.go b/database/job.go
index f509105..7a1a880 100644
--- a/database/job.go
+++ b/database/job.go
@@ -186,11 +186,17 @@ func publishJobMapping(id, automation string, contextStructs *model.Context, ori
return fmt.Errorf("message generation failed: %w", err)
}
- return publishJob(id, automation, contextStructs, origin, msg, db)
-}
+ db.bus.JobChannel.Publish(&bus.JobMsg{
+ ID: id,
+ Automation: automation,
+ Origin: origin,
+ Message: &model.Message{
+ Context: contextStructs,
+ Payload: msg,
+ },
+ })
-func publishJob(id, automation string, contextStructs *model.Context, origin *model.Origin, payload map[string]any, db *Database) error {
- return db.bus.PublishJob(id, automation, payload, contextStructs, origin)
+ return nil
}
func generatePayload(msgMapping map[string]string, contextStructs *model.Context) (map[string]any, error) {
diff --git a/database/ticket.go b/database/ticket.go
index e0e75d6..45d2154 100644
--- a/database/ticket.go
+++ b/database/ticket.go
@@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
- "log"
"sort"
"strconv"
"strings"
@@ -252,11 +251,10 @@ func (db *Database) TicketBatchCreate(ctx context.Context, ticketForms []*model.
ids = append(ids, driver.NewDocumentID(TicketCollectionName, fmt.Sprint(apiTicket.ID)))
}
- go func() {
- if err := db.bus.PublishDatabaseUpdate(ids, bus.DatabaseEntryUpdated); err != nil {
- log.Println(err)
- }
- }()
+ db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{
+ IDs: ids,
+ Type: bus.DatabaseEntryCreated,
+ })
ticketResponses, err := toTicketResponses(apiTickets)
if err != nil {
diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml
index 12bfb42..f836720 100644
--- a/dev/docker-compose.yml
+++ b/dev/docker-compose.yml
@@ -13,13 +13,6 @@ services:
ARANGO_ROOT_PASSWORD: foobar
networks: [ catalyst ]
- emitter:
- image: emitter/server
- environment:
- - EMITTER_LICENSE=PfA8ID8izeSlDUlNZgNXo77DQV9QzlNtxTk64WreCXKfDZsREAVXUXwh20UKOZdkALbLTmOytO_iC6mc_twKAQ:3
- # A9RysEsPJni8RaHeg_K0FKXQNfBrUyw-
- networks: [ catalyst ]
-
minio:
image: minio/minio:RELEASE.2021-12-10T23-03-39Z
environment:
diff --git a/dev/nginx.conf b/dev/nginx.conf
index eb648cf..51d76c9 100644
--- a/dev/nginx.conf
+++ b/dev/nginx.conf
@@ -110,13 +110,3 @@ http {
}
}
}
-
-stream {
- server {
- listen 9001;
-
- resolver 127.0.0.11 valid=30s;
- set $upstream_emitter emitter;
- proxy_pass $upstream_emitter:8080;
- }
-}
diff --git a/file.go b/file.go
index 0b7cd91..1fcacb9 100644
--- a/file.go
+++ b/file.go
@@ -25,7 +25,7 @@ import (
"github.com/SecurityBrewery/catalyst/storage"
)
-func tusdUpload(db *database.Database, bus *bus.Bus, client *s3.S3, external string) http.HandlerFunc {
+func tusdUpload(db *database.Database, catalystBus *bus.Bus, client *s3.S3, external string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ticketID := chi.URLParam(r, "ticketID")
if ticketID == "" {
@@ -80,10 +80,11 @@ func tusdUpload(db *database.Database, bus *bus.Bus, client *s3.S3, external str
return
}
- err = bus.PublishRequest(userID, "LinkFiles", []driver.DocumentID{driver.DocumentID(fmt.Sprintf("tickets/%d", doc.ID))})
- if err != nil {
- log.Println(err)
- }
+ catalystBus.RequestChannel.Publish(&bus.RequestMsg{
+ User: userID,
+ Function: "LinkFiles",
+ IDs: []driver.DocumentID{driver.DocumentID(fmt.Sprintf("tickets/%d", doc.ID))},
+ })
}()
switch r.Method {
diff --git a/go.cap b/go.cap
index a671c85..ae1c73c 100644
--- a/go.cap
+++ b/go.cap
@@ -75,7 +75,6 @@ github.com/docker/docker/errdefs (network)
github.com/docker/go-connections/nat (network)
github.com/docker/go-connections/sockets (file, network, syscall)
github.com/docker/go-connections/tlsconfig (file)
-github.com/eclipse/paho.mqtt.golang (file, network, reflect)
github.com/go-chi/chi (network)
github.com/go-chi/chi/middleware (file, network)
github.com/go-chi/cors (file, network)
@@ -87,7 +86,6 @@ github.com/golang/protobuf/ptypes/any (reflect)
github.com/golang/protobuf/ptypes/duration (reflect)
github.com/golang/protobuf/ptypes/timestamp (reflect)
github.com/google/uuid (file, network)
-github.com/gorilla/websocket (file, network, unsafe)
github.com/imdario/mergo (reflect)
github.com/jmespath/go-jmespath (reflect)
github.com/sirupsen/logrus (file, reflect)
diff --git a/go.mod b/go.mod
index 4ae1323..969253d 100644
--- a/go.mod
+++ b/go.mod
@@ -11,7 +11,6 @@ require (
github.com/blevesearch/bleve/v2 v2.3.2
github.com/coreos/go-oidc/v3 v3.2.0
github.com/docker/docker v17.12.0-ce-rc1.0.20201201034508-7d75c1d40d88+incompatible
- github.com/emitter-io/go/v2 v2.0.9
github.com/go-chi/chi v1.5.4
github.com/go-chi/cors v1.2.1
github.com/gobwas/ws v1.1.0
@@ -56,14 +55,12 @@ require (
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.4.0 // indirect
- github.com/eclipse/paho.mqtt.golang v1.3.5 // indirect
github.com/gobwas/httphead v0.1.0 // indirect
github.com/gobwas/pool v0.2.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gorilla/mux v1.8.0 // indirect
- github.com/gorilla/websocket v1.4.2 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/morikuni/aec v1.0.0 // indirect
diff --git a/go.sum b/go.sum
index 29c5265..ab876a3 100644
--- a/go.sum
+++ b/go.sum
@@ -348,14 +348,9 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
-github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts=
-github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y=
-github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc=
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
-github.com/emitter-io/go/v2 v2.0.9 h1:qA+cnG3kS2uLzo5ETFY6zbHBGl+FmNj0cGf3da7foA4=
-github.com/emitter-io/go/v2 v2.0.9/go.mod h1:St++epE1u/6ueCVw47xhu4shpkGNxKRVtkWv4Xi33mg=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
@@ -515,7 +510,6 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
-github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
@@ -924,7 +918,6 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
-golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200505041828-1ed23360d12c/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
diff --git a/server.go b/server.go
index f7af421..b842812 100644
--- a/server.go
+++ b/server.go
@@ -28,11 +28,11 @@ type Config struct {
IndexPath string
DB *database.Config
Storage *storage.Config
- Bus *bus.Config
Secret []byte
Auth *AuthConfig
ExternalAddress string
+ InternalAddress string
InitialAPIKey string
Network string
}
@@ -65,20 +65,14 @@ func New(hooks *hooks.Hooks, config *Config) (*Server, error) {
return nil, err
}
- catalystBus, err := bus.New(config.Bus)
- if err != nil {
- return nil, err
- }
+ catalystBus := bus.New()
catalystDatabase, err := database.New(ctx, catalystIndex, catalystBus, hooks, config.DB)
if err != nil {
return nil, err
}
- err = busservice.New(config.Bus.APIUrl, config.InitialAPIKey, config.Network, catalystBus, catalystDatabase)
- if err != nil {
- return nil, err
- }
+ busservice.New(config.InternalAddress+"/api", config.InitialAPIKey, config.Network, catalystBus, catalystDatabase)
catalystService, err := service.New(catalystBus, catalystDatabase, catalystStorage, GetVersion())
if err != nil {
diff --git a/service/job.go b/service/job.go
index cafcb72..16fa862 100644
--- a/service/job.go
+++ b/service/job.go
@@ -7,6 +7,7 @@ import (
"github.com/arangodb/go-driver"
"github.com/google/uuid"
+ "github.com/SecurityBrewery/catalyst/bus"
"github.com/SecurityBrewery/catalyst/database"
"github.com/SecurityBrewery/catalyst/generated/model"
)
@@ -32,7 +33,15 @@ func (s *Service) RunJob(ctx context.Context, form *model.JobForm) (doc *model.J
newJobID := uuid.NewString()
defer s.publishRequest(ctx, err, "RunJob", jobID(newJobID))
- err = s.bus.PublishJob(newJobID, form.Automation, form.Payload, msgContext, form.Origin)
+ s.bus.JobChannel.Publish(&bus.JobMsg{
+ ID: newJobID,
+ Automation: form.Automation,
+ Origin: form.Origin,
+ Message: &model.Message{
+ Context: msgContext,
+ Payload: form.Payload,
+ },
+ })
return &model.JobResponse{
Automation: form.Automation,
@@ -40,7 +49,7 @@ func (s *Service) RunJob(ctx context.Context, form *model.JobForm) (doc *model.J
Origin: form.Origin,
Payload: form.Payload,
Status: "published",
- }, err
+ }, nil
}
func (s *Service) GetJob(ctx context.Context, id string) (*model.JobResponse, error) {
diff --git a/service/service.go b/service/service.go
index 327f3d0..bc7e82a 100644
--- a/service/service.go
+++ b/service/service.go
@@ -2,7 +2,6 @@ package service
import (
"context"
- "log"
"github.com/arangodb/go-driver"
@@ -34,10 +33,10 @@ func (s *Service) publishRequest(ctx context.Context, err error, function string
userID = user.ID
}
- go func() {
- if err := s.bus.PublishRequest(userID, function, ids); err != nil {
- log.Println(err)
- }
- }()
+ s.bus.RequestChannel.Publish(&bus.RequestMsg{
+ User: userID,
+ Function: function,
+ IDs: ids,
+ })
}
}
diff --git a/service/ticket.go b/service/ticket.go
index 58cdf54..9fa3ed7 100644
--- a/service/ticket.go
+++ b/service/ticket.go
@@ -9,6 +9,7 @@ import (
"github.com/arangodb/go-driver"
"github.com/google/uuid"
+ "github.com/SecurityBrewery/catalyst/bus"
"github.com/SecurityBrewery/catalyst/database"
"github.com/SecurityBrewery/catalyst/generated/api"
"github.com/SecurityBrewery/catalyst/generated/model"
@@ -135,7 +136,17 @@ func (s *Service) RunArtifact(ctx context.Context, id int64, name string, automa
jobID := uuid.NewString()
origin := &model.Origin{ArtifactOrigin: &model.ArtifactOrigin{TicketId: id, Artifact: name}}
- return s.bus.PublishJob(jobID, automation, map[string]string{"default": name}, &model.Context{Artifact: artifact}, origin)
+ s.bus.JobChannel.Publish(&bus.JobMsg{
+ ID: jobID,
+ Automation: automation,
+ Origin: origin,
+ Message: &model.Message{
+ Context: &model.Context{Artifact: artifact},
+ Payload: map[string]string{"default": name},
+ },
+ })
+
+ return nil
}
func (s *Service) AddComment(ctx context.Context, i int64, form *model.CommentForm) (doc *model.TicketWithTickets, err error) {
diff --git a/start_dev.sh b/start_dev.sh
index ae152c3..f99149a 100644
--- a/start_dev.sh
+++ b/start_dev.sh
@@ -7,8 +7,6 @@ export ARANGO_DB_HOST=http://localhost:8529
export ARANGO_DB_PASSWORD=foobar
export S3_HOST=http://localhost:9000
export S3_PASSWORD=minio123
-export EMITTER_IO_HOST=tcp://localhost:9001
-export EMITTER_IO_KEY=A9RysEsPJni8RaHeg_K0FKXQNfBrUyw-
export AUTH_BLOCK_NEW=false
export AUTH_DEFAULT_ROLES=analyst,admin
diff --git a/test/test.go b/test/test.go
index c6d4cd2..5ebe3bf 100644
--- a/test/test.go
+++ b/test/test.go
@@ -45,11 +45,6 @@ func Config(ctx context.Context) (*catalyst.Config, error) {
User: "minio",
Password: "minio123",
},
- Bus: &bus.Config{
- Host: "tcp://localhost:9001",
- Key: "A9RysEsPJni8RaHeg_K0FKXQNfBrUyw-",
- APIUrl: "http://localhost:8002/api",
- },
Secret: []byte("4ef5b29539b70233dd40c02a1799d25079595565e05a193b09da2c3e60ada1cd"),
Auth: &catalyst.AuthConfig{
OIDCIssuer: "http://localhost:9002/auth/realms/catalyst",
@@ -100,10 +95,7 @@ func Bus(t *testing.T) (context.Context, *catalyst.Config, *bus.Bus, error) {
t.Fatal(err)
}
- catalystBus, err := bus.New(config.Bus)
- if err != nil {
- t.Fatal(err)
- }
+ catalystBus := bus.New()
return ctx, config, catalystBus, err
}
diff --git a/ui/src/components/icons/ArangoIcon.vue b/ui/src/components/icons/ArangoIcon.vue
deleted file mode 100644
index e493148..0000000
--- a/ui/src/components/icons/ArangoIcon.vue
+++ /dev/null
@@ -1,34 +0,0 @@
-
-
-
-
-
-
-
diff --git a/ui/src/components/icons/EmitterIcon.vue b/ui/src/components/icons/EmitterIcon.vue
deleted file mode 100644
index b87c639..0000000
--- a/ui/src/components/icons/EmitterIcon.vue
+++ /dev/null
@@ -1,31 +0,0 @@
-
-
-
-
-
-
-
diff --git a/ui/src/components/icons/MinioIcon.vue b/ui/src/components/icons/MinioIcon.vue
deleted file mode 100644
index 197ea11..0000000
--- a/ui/src/components/icons/MinioIcon.vue
+++ /dev/null
@@ -1,22 +0,0 @@
-
-
-
-
-
-
-
diff --git a/ui/src/components/icons/NodeRedIcon.vue b/ui/src/components/icons/NodeRedIcon.vue
deleted file mode 100644
index 3457ea5..0000000
--- a/ui/src/components/icons/NodeRedIcon.vue
+++ /dev/null
@@ -1,43 +0,0 @@
-
-
-
-
-
-
-
diff --git a/ui/src/plugins/vuetify.ts b/ui/src/plugins/vuetify.ts
index e8a8003..8f6458c 100644
--- a/ui/src/plugins/vuetify.ts
+++ b/ui/src/plugins/vuetify.ts
@@ -1,10 +1,6 @@
import "@mdi/font/css/materialdesignicons.css";
import Vue from "vue";
import Vuetify from "vuetify/lib";
-import MinioIcon from "../components/icons/MinioIcon.vue";
-import NodeRedIcon from "../components/icons/NodeRedIcon.vue";
-import ArangoIcon from "../components/icons/ArangoIcon.vue";
-import EmitterIcon from "../components/icons/EmitterIcon.vue";
Vue.use(Vuetify);
@@ -73,18 +69,6 @@ export default new Vuetify({
},
icons: {
values: {
- minio: {
- component: MinioIcon,
- },
- nodered: {
- component: NodeRedIcon,
- },
- arango: {
- component: ArangoIcon,
- },
- emitter: {
- component: EmitterIcon,
- },
},
},
});
diff --git a/ui/src/views/embed/Emitter.vue b/ui/src/views/embed/Emitter.vue
deleted file mode 100644
index 8414679..0000000
--- a/ui/src/views/embed/Emitter.vue
+++ /dev/null
@@ -1,21 +0,0 @@
-
-
-
-
-
-
-
diff --git a/websocket.go b/websocket.go
index 0c7f83f..044cdc1 100644
--- a/websocket.go
+++ b/websocket.go
@@ -3,7 +3,6 @@ package catalyst
import (
"encoding/json"
"errors"
- "log"
"net/http"
"sync"
@@ -49,7 +48,7 @@ func handleWebSocket(catalystBus *bus.Bus) http.HandlerFunc {
broker := websocketBroker{clients: map[string]chan []byte{}}
// send all messages from bus to websocket
- err := catalystBus.SubscribeDatabaseUpdate(func(msg *bus.DatabaseUpdateMsg) {
+ catalystBus.DatabaseChannel.Subscribe(func(msg *bus.DatabaseUpdateMsg) {
b, err := json.Marshal(map[string]any{
"action": "update",
"ids": msg.IDs,
@@ -60,9 +59,6 @@ func handleWebSocket(catalystBus *bus.Bus) http.HandlerFunc {
broker.Publish(b)
})
- if err != nil {
- log.Println(err)
- }
return func(w http.ResponseWriter, r *http.Request) {
conn, _, _, err := ws.UpgradeHTTP(r, w)