From dfb501f8b92a139eb86a5bd2044303f20ad72add Mon Sep 17 00:00:00 2001 From: Jonas Plum Date: Sat, 14 May 2022 01:08:37 +0200 Subject: [PATCH] Remove emitter (#184) * Remove emitter --- bus/bus.go | 105 ++++++++++++------------ bus/databaseupdate.go | 43 ---------- bus/job.go | 43 ---------- bus/request.go | 37 --------- bus/result.go | 34 -------- busservice/busservice.go | 18 ++-- busservice/job.go | 4 +- cmd/cmd.go | 6 +- database/busdb/busdb.go | 27 +++--- database/busdb/log.go | 11 +-- database/job.go | 14 +++- database/ticket.go | 10 +-- dev/docker-compose.yml | 7 -- dev/nginx.conf | 10 --- file.go | 11 +-- go.cap | 2 - go.mod | 3 - go.sum | 7 -- server.go | 12 +-- service/job.go | 13 ++- service/service.go | 11 ++- service/ticket.go | 13 ++- start_dev.sh | 2 - test/test.go | 10 +-- ui/src/components/icons/ArangoIcon.vue | 34 -------- ui/src/components/icons/EmitterIcon.vue | 31 ------- ui/src/components/icons/MinioIcon.vue | 22 ----- ui/src/components/icons/NodeRedIcon.vue | 43 ---------- ui/src/plugins/vuetify.ts | 16 ---- ui/src/views/embed/Emitter.vue | 21 ----- websocket.go | 6 +- 31 files changed, 126 insertions(+), 500 deletions(-) delete mode 100644 bus/databaseupdate.go delete mode 100644 bus/job.go delete mode 100644 bus/request.go delete mode 100644 bus/result.go delete mode 100644 ui/src/components/icons/ArangoIcon.vue delete mode 100644 ui/src/components/icons/EmitterIcon.vue delete mode 100644 ui/src/components/icons/MinioIcon.vue delete mode 100644 ui/src/components/icons/NodeRedIcon.vue delete mode 100644 ui/src/views/embed/Emitter.vue diff --git a/bus/bus.go b/bus/bus.go index 259e9e7..ff707be 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -1,70 +1,69 @@ package bus import ( - "encoding/json" - "log" + "github.com/arangodb/go-driver" - emitter "github.com/emitter-io/go/v2" + "github.com/SecurityBrewery/catalyst/generated/model" ) +type ResultMsg struct { + Automation string `json:"automation"` + Data map[string]any `json:"data,omitempty"` + Target *model.Origin `json:"target"` +} + +type RequestMsg struct { + IDs []driver.DocumentID `json:"ids"` + Function string `json:"function"` + User string `json:"user"` +} + +type JobMsg struct { + ID string `json:"id"` + Automation string `json:"automation"` + Origin *model.Origin `json:"origin"` + Message *model.Message `json:"message"` +} + +type DatabaseUpdateType string + +const ( + DatabaseEntryRead DatabaseUpdateType = "read" + DatabaseEntryCreated DatabaseUpdateType = "created" + DatabaseEntryUpdated DatabaseUpdateType = "updated" +) + +type DatabaseUpdateMsg struct { + IDs []driver.DocumentID `json:"ids"` + Type DatabaseUpdateType `json:"type"` +} + type Bus struct { - config *Config - client *emitter.Client + ResultChannel *Channel[*ResultMsg] + RequestChannel *Channel[*RequestMsg] + JobChannel *Channel[*JobMsg] + DatabaseChannel *Channel[*DatabaseUpdateMsg] } -type Config struct { - Host string - Key string - databaseUpdateBusKey string - jobBusKey string - resultBusKey string - requestKey string - APIUrl string +func New() *Bus { + return &Bus{ + ResultChannel: &Channel[*ResultMsg]{}, + RequestChannel: &Channel[*RequestMsg]{}, + JobChannel: &Channel[*JobMsg]{}, + DatabaseChannel: &Channel[*DatabaseUpdateMsg]{}, + } } -func New(c *Config) (*Bus, error) { - client, err := emitter.Connect(c.Host, func(_ *emitter.Client, msg emitter.Message) { - log.Printf("received: '%s' topic: '%s'\n", msg.Payload(), msg.Topic()) - }) - if err != nil { - return nil, err - } - - c.databaseUpdateBusKey, err = client.GenerateKey(c.Key, channelDatabaseUpdate+"/", "rwls", 0) - if err != nil { - return nil, err - } - c.jobBusKey, err = client.GenerateKey(c.Key, channelJob+"/", "rwls", 0) - if err != nil { - return nil, err - } - c.resultBusKey, err = client.GenerateKey(c.Key, channelResult+"/", "rwls", 0) - if err != nil { - return nil, err - } - c.requestKey, err = client.GenerateKey(c.Key, ChannelRequest+"/", "rwls", 0) - if err != nil { - return nil, err - } - - return &Bus{config: c, client: client}, err +type Channel[T any] struct { + Subscriber []func(T) } -func (b *Bus) jsonPublish(msg any, channel, key string) error { - payload, err := json.Marshal(msg) - if err != nil { - return err +func (c *Channel[T]) Publish(msg T) { + for _, s := range c.Subscriber { + go s(msg) } - - return b.client.Publish(key, channel, payload) } -func (b *Bus) safeSubscribe(key, channel string, handler func(c *emitter.Client, m emitter.Message)) error { - defer func() { - if r := recover(); r != nil { - log.Printf("Recovered %s in channel %s\n", r, channel) - } - }() - - return b.client.Subscribe(key, channel, handler) +func (c *Channel[T]) Subscribe(handler func(T)) { + c.Subscriber = append(c.Subscriber, handler) } diff --git a/bus/databaseupdate.go b/bus/databaseupdate.go deleted file mode 100644 index b37cdbf..0000000 --- a/bus/databaseupdate.go +++ /dev/null @@ -1,43 +0,0 @@ -package bus - -import ( - "encoding/json" - "log" - - "github.com/arangodb/go-driver" - emitter "github.com/emitter-io/go/v2" -) - -const channelDatabaseUpdate = "databaseupdate" - -type DatabaseUpdateType string - -const ( - DatabaseEntryRead DatabaseUpdateType = "read" - DatabaseEntryCreated DatabaseUpdateType = "created" - DatabaseEntryUpdated DatabaseUpdateType = "updated" -) - -type DatabaseUpdateMsg struct { - IDs []driver.DocumentID `json:"ids"` - Type DatabaseUpdateType `json:"type"` -} - -func (b *Bus) PublishDatabaseUpdate(ids []driver.DocumentID, databaseUpdateType DatabaseUpdateType) error { - return b.jsonPublish(&DatabaseUpdateMsg{ - IDs: ids, - Type: databaseUpdateType, - }, channelDatabaseUpdate, b.config.databaseUpdateBusKey) -} - -func (b *Bus) SubscribeDatabaseUpdate(f func(msg *DatabaseUpdateMsg)) error { - return b.safeSubscribe(b.config.databaseUpdateBusKey, channelDatabaseUpdate, func(c *emitter.Client, m emitter.Message) { - var msg DatabaseUpdateMsg - if err := json.Unmarshal(m.Payload(), &msg); err != nil { - log.Println(err) - - return - } - go f(&msg) - }) -} diff --git a/bus/job.go b/bus/job.go deleted file mode 100644 index 8de5e35..0000000 --- a/bus/job.go +++ /dev/null @@ -1,43 +0,0 @@ -package bus - -import ( - "encoding/json" - "log" - - emitter "github.com/emitter-io/go/v2" - - "github.com/SecurityBrewery/catalyst/generated/model" -) - -const channelJob = "job" - -type JobMsg struct { - ID string `json:"id"` - Automation string `json:"automation"` - Origin *model.Origin `json:"origin"` - Message *model.Message `json:"message"` -} - -func (b *Bus) PublishJob(id, automation string, payload any, context *model.Context, origin *model.Origin) error { - return b.jsonPublish(&JobMsg{ - ID: id, - Automation: automation, - Origin: origin, - Message: &model.Message{ - Context: context, - Payload: payload, - }, - }, channelJob, b.config.jobBusKey) -} - -func (b *Bus) SubscribeJob(f func(msg *JobMsg)) error { - return b.safeSubscribe(b.config.jobBusKey, channelJob, func(c *emitter.Client, m emitter.Message) { - var msg JobMsg - if err := json.Unmarshal(m.Payload(), &msg); err != nil { - log.Println(err) - - return - } - go f(&msg) - }) -} diff --git a/bus/request.go b/bus/request.go deleted file mode 100644 index 04496fd..0000000 --- a/bus/request.go +++ /dev/null @@ -1,37 +0,0 @@ -package bus - -import ( - "encoding/json" - "log" - - "github.com/arangodb/go-driver" - emitter "github.com/emitter-io/go/v2" -) - -const ChannelRequest = "request" - -type RequestMsg struct { - IDs []driver.DocumentID `json:"ids"` - Function string `json:"function"` - User string `json:"user"` -} - -func (b *Bus) PublishRequest(user, f string, ids []driver.DocumentID) error { - return b.jsonPublish(&RequestMsg{ - User: user, - Function: f, - IDs: ids, - }, ChannelRequest, b.config.requestKey) -} - -func (b *Bus) SubscribeRequest(f func(msg *RequestMsg)) error { - return b.safeSubscribe(b.config.requestKey, ChannelRequest, func(c *emitter.Client, m emitter.Message) { - msg := &RequestMsg{} - if err := json.Unmarshal(m.Payload(), msg); err != nil { - log.Println(err) - - return - } - go f(msg) - }) -} diff --git a/bus/result.go b/bus/result.go deleted file mode 100644 index 6a0e23e..0000000 --- a/bus/result.go +++ /dev/null @@ -1,34 +0,0 @@ -package bus - -import ( - "encoding/json" - "log" - - emitter "github.com/emitter-io/go/v2" - - "github.com/SecurityBrewery/catalyst/generated/model" -) - -const channelResult = "result" - -type ResultMsg struct { - Automation string `json:"automation"` - Data map[string]any `json:"data,omitempty"` - Target *model.Origin `json:"target"` -} - -func (b *Bus) PublishResult(automation string, data map[string]any, target *model.Origin) error { - return b.jsonPublish(&ResultMsg{Automation: automation, Data: data, Target: target}, channelResult, b.config.resultBusKey) -} - -func (b *Bus) SubscribeResult(f func(msg *ResultMsg)) error { - return b.safeSubscribe(b.config.resultBusKey, channelResult, func(c *emitter.Client, m emitter.Message) { - msg := &ResultMsg{} - if err := json.Unmarshal(m.Payload(), msg); err != nil { - log.Println(err) - - return - } - go f(msg) - }) -} diff --git a/busservice/busservice.go b/busservice/busservice.go index b421c56..4fc0af0 100644 --- a/busservice/busservice.go +++ b/busservice/busservice.go @@ -20,20 +20,12 @@ type busService struct { network string } -func New(apiURL, apikey, network string, catalystBus *bus.Bus, db *database.Database) error { +func New(apiURL, apikey, network string, catalystBus *bus.Bus, db *database.Database) { h := &busService{db: db, apiURL: apiURL, apiKey: apikey, network: network, catalystBus: catalystBus} - if err := catalystBus.SubscribeRequest(h.logRequest); err != nil { - return err - } - if err := catalystBus.SubscribeResult(h.handleResult); err != nil { - return err - } - if err := catalystBus.SubscribeJob(h.handleJob); err != nil { - return err - } - - return nil + catalystBus.RequestChannel.Subscribe(h.logRequest) + catalystBus.ResultChannel.Subscribe(h.handleResult) + catalystBus.JobChannel.Subscribe(h.handleJob) } func busContext() context.Context { @@ -47,7 +39,7 @@ func (h *busService) logRequest(msg *bus.RequestMsg) { var logEntries []*model.LogEntry for _, i := range msg.IDs { logEntries = append(logEntries, &model.LogEntry{ - Type: bus.ChannelRequest, + Type: "request", Reference: i.String(), Creator: msg.User, Message: msg.Function, diff --git a/busservice/job.go b/busservice/job.go index f15a835..b7661d5 100644 --- a/busservice/job.go +++ b/busservice/job.go @@ -82,9 +82,7 @@ func (h *busService) handleJob(automationMsg *bus.JobMsg) { } } - if err := h.catalystBus.PublishResult(automationMsg.Automation, result, automationMsg.Origin); err != nil { - log.Println(err) - } + h.catalystBus.ResultChannel.Publish(&bus.ResultMsg{Automation: automationMsg.Automation, Data: result, Target: automationMsg.Origin}) if err := h.db.JobComplete(ctx, automationMsg.ID, result); err != nil { log.Println(err) diff --git a/cmd/cmd.go b/cmd/cmd.go index 7082108..eed7532 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -8,7 +8,6 @@ import ( "golang.org/x/oauth2" "github.com/SecurityBrewery/catalyst" - "github.com/SecurityBrewery/catalyst/bus" "github.com/SecurityBrewery/catalyst/database" "github.com/SecurityBrewery/catalyst/role" "github.com/SecurityBrewery/catalyst/storage" @@ -41,9 +40,6 @@ type CLI struct { S3User string `env:"S3_USER" default:"minio" name:"s3-user"` S3Password string `env:"S3_PASSWORD" required:"" name:"s3-password"` - EmitterIOHost string `env:"EMITTER_IO_HOST" default:"tcp://emitter:8080"` - EmitterIORKey string `env:"EMITTER_IO_KEY" required:""` - InitialAPIKey string `env:"INITIAL_API_KEY"` } @@ -71,6 +67,7 @@ func MapConfig(cli CLI) (*catalyst.Config, error) { Storage: &storage.Config{Host: cli.S3Host, User: cli.S3User, Password: cli.S3Password}, Secret: []byte(cli.Secret), ExternalAddress: cli.ExternalAddress, + InternalAddress: cli.CatalystAddress, Auth: &catalyst.AuthConfig{ OIDCIssuer: cli.OIDCIssuer, OAuth2: &oauth2.Config{ClientID: cli.OIDCClientID, ClientSecret: cli.OIDCClientSecret, RedirectURL: cli.ExternalAddress + "/callback", Scopes: scopes}, @@ -81,7 +78,6 @@ func MapConfig(cli CLI) (*catalyst.Config, error) { AuthDefaultRoles: roles, AuthAdminUsers: cli.AuthAdminUsers, }, - Bus: &bus.Config{Host: cli.EmitterIOHost, Key: cli.EmitterIORKey, APIUrl: cli.CatalystAddress + "/api"}, InitialAPIKey: cli.InitialAPIKey, } diff --git a/database/busdb/busdb.go b/database/busdb/busdb.go index 3e8c988..df36e59 100644 --- a/database/busdb/busdb.go +++ b/database/busdb/busdb.go @@ -55,9 +55,7 @@ func (db *BusDatabase) Query(ctx context.Context, query string, vars map[string] switch { case operation.Type == bus.DatabaseEntryCreated, operation.Type == bus.DatabaseEntryUpdated: - if err := db.bus.PublishDatabaseUpdate(operation.Ids, operation.Type); err != nil { - return nil, nil, err - } + db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: operation.Ids, Type: operation.Type}) } return cur, logs, err @@ -92,10 +90,7 @@ func (c *Collection[T]) CreateDocument(ctx, newctx context.Context, key string, return meta, err } - err = c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryCreated) - if err != nil { - return meta, err - } + c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryCreated}) return meta, nil } @@ -108,10 +103,7 @@ func (c *Collection[T]) CreateEdge(ctx, newctx context.Context, edge *driver.Edg return meta, err } - err = c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryCreated) - if err != nil { - return meta, err - } + c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryCreated}) return meta, nil } @@ -132,10 +124,7 @@ func (c *Collection[T]) CreateEdges(ctx context.Context, edges []*driver.EdgeDoc ids = append(ids, meta.ID) } - err = c.db.bus.PublishDatabaseUpdate(ids, bus.DatabaseEntryCreated) - if err != nil { - return metas, err - } + c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: ids, Type: bus.DatabaseEntryCreated}) return metas, nil } @@ -162,7 +151,9 @@ func (c *Collection[T]) UpdateDocument(ctx context.Context, key string, update a return meta, err } - return meta, c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryUpdated) + c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryUpdated}) + + return meta, nil } func (c *Collection[T]) ReplaceDocument(ctx context.Context, key string, document *T) (meta driver.DocumentMeta, err error) { @@ -173,7 +164,9 @@ func (c *Collection[T]) ReplaceDocument(ctx context.Context, key string, documen return meta, err } - return meta, c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryUpdated) + c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryUpdated}) + + return meta, nil } func (c *Collection[T]) RemoveDocument(ctx context.Context, formatInt string) (meta driver.DocumentMeta, err error) { diff --git a/database/busdb/log.go b/database/busdb/log.go index f4f1327..0af9645 100644 --- a/database/busdb/log.go +++ b/database/busdb/log.go @@ -3,7 +3,6 @@ package busdb import ( "context" "errors" - "log" "strings" "github.com/arangodb/go-driver" @@ -46,12 +45,10 @@ func (db *BusDatabase) LogBatchCreate(ctx context.Context, logentries []*model.L } } if ids != nil { - go func() { - err := db.bus.PublishDatabaseUpdate(ids, bus.DatabaseEntryCreated) - if err != nil { - log.Println(err) - } - }() + go db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{ + IDs: ids, + Type: bus.DatabaseEntryCreated, + }) } _, errs, err := db.logCollection.CreateDocuments(ctx, logentries) diff --git a/database/job.go b/database/job.go index f509105..7a1a880 100644 --- a/database/job.go +++ b/database/job.go @@ -186,11 +186,17 @@ func publishJobMapping(id, automation string, contextStructs *model.Context, ori return fmt.Errorf("message generation failed: %w", err) } - return publishJob(id, automation, contextStructs, origin, msg, db) -} + db.bus.JobChannel.Publish(&bus.JobMsg{ + ID: id, + Automation: automation, + Origin: origin, + Message: &model.Message{ + Context: contextStructs, + Payload: msg, + }, + }) -func publishJob(id, automation string, contextStructs *model.Context, origin *model.Origin, payload map[string]any, db *Database) error { - return db.bus.PublishJob(id, automation, payload, contextStructs, origin) + return nil } func generatePayload(msgMapping map[string]string, contextStructs *model.Context) (map[string]any, error) { diff --git a/database/ticket.go b/database/ticket.go index e0e75d6..45d2154 100644 --- a/database/ticket.go +++ b/database/ticket.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "sort" "strconv" "strings" @@ -252,11 +251,10 @@ func (db *Database) TicketBatchCreate(ctx context.Context, ticketForms []*model. ids = append(ids, driver.NewDocumentID(TicketCollectionName, fmt.Sprint(apiTicket.ID))) } - go func() { - if err := db.bus.PublishDatabaseUpdate(ids, bus.DatabaseEntryUpdated); err != nil { - log.Println(err) - } - }() + db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{ + IDs: ids, + Type: bus.DatabaseEntryCreated, + }) ticketResponses, err := toTicketResponses(apiTickets) if err != nil { diff --git a/dev/docker-compose.yml b/dev/docker-compose.yml index 12bfb42..f836720 100644 --- a/dev/docker-compose.yml +++ b/dev/docker-compose.yml @@ -13,13 +13,6 @@ services: ARANGO_ROOT_PASSWORD: foobar networks: [ catalyst ] - emitter: - image: emitter/server - environment: - - EMITTER_LICENSE=PfA8ID8izeSlDUlNZgNXo77DQV9QzlNtxTk64WreCXKfDZsREAVXUXwh20UKOZdkALbLTmOytO_iC6mc_twKAQ:3 - # A9RysEsPJni8RaHeg_K0FKXQNfBrUyw- - networks: [ catalyst ] - minio: image: minio/minio:RELEASE.2021-12-10T23-03-39Z environment: diff --git a/dev/nginx.conf b/dev/nginx.conf index eb648cf..51d76c9 100644 --- a/dev/nginx.conf +++ b/dev/nginx.conf @@ -110,13 +110,3 @@ http { } } } - -stream { - server { - listen 9001; - - resolver 127.0.0.11 valid=30s; - set $upstream_emitter emitter; - proxy_pass $upstream_emitter:8080; - } -} diff --git a/file.go b/file.go index 0b7cd91..1fcacb9 100644 --- a/file.go +++ b/file.go @@ -25,7 +25,7 @@ import ( "github.com/SecurityBrewery/catalyst/storage" ) -func tusdUpload(db *database.Database, bus *bus.Bus, client *s3.S3, external string) http.HandlerFunc { +func tusdUpload(db *database.Database, catalystBus *bus.Bus, client *s3.S3, external string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { ticketID := chi.URLParam(r, "ticketID") if ticketID == "" { @@ -80,10 +80,11 @@ func tusdUpload(db *database.Database, bus *bus.Bus, client *s3.S3, external str return } - err = bus.PublishRequest(userID, "LinkFiles", []driver.DocumentID{driver.DocumentID(fmt.Sprintf("tickets/%d", doc.ID))}) - if err != nil { - log.Println(err) - } + catalystBus.RequestChannel.Publish(&bus.RequestMsg{ + User: userID, + Function: "LinkFiles", + IDs: []driver.DocumentID{driver.DocumentID(fmt.Sprintf("tickets/%d", doc.ID))}, + }) }() switch r.Method { diff --git a/go.cap b/go.cap index a671c85..ae1c73c 100644 --- a/go.cap +++ b/go.cap @@ -75,7 +75,6 @@ github.com/docker/docker/errdefs (network) github.com/docker/go-connections/nat (network) github.com/docker/go-connections/sockets (file, network, syscall) github.com/docker/go-connections/tlsconfig (file) -github.com/eclipse/paho.mqtt.golang (file, network, reflect) github.com/go-chi/chi (network) github.com/go-chi/chi/middleware (file, network) github.com/go-chi/cors (file, network) @@ -87,7 +86,6 @@ github.com/golang/protobuf/ptypes/any (reflect) github.com/golang/protobuf/ptypes/duration (reflect) github.com/golang/protobuf/ptypes/timestamp (reflect) github.com/google/uuid (file, network) -github.com/gorilla/websocket (file, network, unsafe) github.com/imdario/mergo (reflect) github.com/jmespath/go-jmespath (reflect) github.com/sirupsen/logrus (file, reflect) diff --git a/go.mod b/go.mod index 4ae1323..969253d 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,6 @@ require ( github.com/blevesearch/bleve/v2 v2.3.2 github.com/coreos/go-oidc/v3 v3.2.0 github.com/docker/docker v17.12.0-ce-rc1.0.20201201034508-7d75c1d40d88+incompatible - github.com/emitter-io/go/v2 v2.0.9 github.com/go-chi/chi v1.5.4 github.com/go-chi/cors v1.2.1 github.com/gobwas/ws v1.1.0 @@ -56,14 +55,12 @@ require ( github.com/docker/distribution v2.7.1+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.4.0 // indirect - github.com/eclipse/paho.mqtt.golang v1.3.5 // indirect github.com/gobwas/httphead v0.1.0 // indirect github.com/gobwas/pool v0.2.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/gorilla/mux v1.8.0 // indirect - github.com/gorilla/websocket v1.4.2 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/morikuni/aec v1.0.0 // indirect diff --git a/go.sum b/go.sum index 29c5265..ab876a3 100644 --- a/go.sum +++ b/go.sum @@ -348,14 +348,9 @@ github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZ github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= -github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y= -github.com/eclipse/paho.mqtt.golang v1.3.5/go.mod h1:eTzb4gxwwyWpqBUHGQZ4ABAV7+Jgm1PklsYT/eo8Hcc= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= -github.com/emitter-io/go/v2 v2.0.9 h1:qA+cnG3kS2uLzo5ETFY6zbHBGl+FmNj0cGf3da7foA4= -github.com/emitter-io/go/v2 v2.0.9/go.mod h1:St++epE1u/6ueCVw47xhu4shpkGNxKRVtkWv4Xi33mg= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -515,7 +510,6 @@ github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= -github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= @@ -924,7 +918,6 @@ golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200301022130-244492dfa37a/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= -golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200501053045-e0ff5e5a1de5/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200505041828-1ed23360d12c/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200506145744-7e3656a0809f/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= diff --git a/server.go b/server.go index f7af421..b842812 100644 --- a/server.go +++ b/server.go @@ -28,11 +28,11 @@ type Config struct { IndexPath string DB *database.Config Storage *storage.Config - Bus *bus.Config Secret []byte Auth *AuthConfig ExternalAddress string + InternalAddress string InitialAPIKey string Network string } @@ -65,20 +65,14 @@ func New(hooks *hooks.Hooks, config *Config) (*Server, error) { return nil, err } - catalystBus, err := bus.New(config.Bus) - if err != nil { - return nil, err - } + catalystBus := bus.New() catalystDatabase, err := database.New(ctx, catalystIndex, catalystBus, hooks, config.DB) if err != nil { return nil, err } - err = busservice.New(config.Bus.APIUrl, config.InitialAPIKey, config.Network, catalystBus, catalystDatabase) - if err != nil { - return nil, err - } + busservice.New(config.InternalAddress+"/api", config.InitialAPIKey, config.Network, catalystBus, catalystDatabase) catalystService, err := service.New(catalystBus, catalystDatabase, catalystStorage, GetVersion()) if err != nil { diff --git a/service/job.go b/service/job.go index cafcb72..16fa862 100644 --- a/service/job.go +++ b/service/job.go @@ -7,6 +7,7 @@ import ( "github.com/arangodb/go-driver" "github.com/google/uuid" + "github.com/SecurityBrewery/catalyst/bus" "github.com/SecurityBrewery/catalyst/database" "github.com/SecurityBrewery/catalyst/generated/model" ) @@ -32,7 +33,15 @@ func (s *Service) RunJob(ctx context.Context, form *model.JobForm) (doc *model.J newJobID := uuid.NewString() defer s.publishRequest(ctx, err, "RunJob", jobID(newJobID)) - err = s.bus.PublishJob(newJobID, form.Automation, form.Payload, msgContext, form.Origin) + s.bus.JobChannel.Publish(&bus.JobMsg{ + ID: newJobID, + Automation: form.Automation, + Origin: form.Origin, + Message: &model.Message{ + Context: msgContext, + Payload: form.Payload, + }, + }) return &model.JobResponse{ Automation: form.Automation, @@ -40,7 +49,7 @@ func (s *Service) RunJob(ctx context.Context, form *model.JobForm) (doc *model.J Origin: form.Origin, Payload: form.Payload, Status: "published", - }, err + }, nil } func (s *Service) GetJob(ctx context.Context, id string) (*model.JobResponse, error) { diff --git a/service/service.go b/service/service.go index 327f3d0..bc7e82a 100644 --- a/service/service.go +++ b/service/service.go @@ -2,7 +2,6 @@ package service import ( "context" - "log" "github.com/arangodb/go-driver" @@ -34,10 +33,10 @@ func (s *Service) publishRequest(ctx context.Context, err error, function string userID = user.ID } - go func() { - if err := s.bus.PublishRequest(userID, function, ids); err != nil { - log.Println(err) - } - }() + s.bus.RequestChannel.Publish(&bus.RequestMsg{ + User: userID, + Function: function, + IDs: ids, + }) } } diff --git a/service/ticket.go b/service/ticket.go index 58cdf54..9fa3ed7 100644 --- a/service/ticket.go +++ b/service/ticket.go @@ -9,6 +9,7 @@ import ( "github.com/arangodb/go-driver" "github.com/google/uuid" + "github.com/SecurityBrewery/catalyst/bus" "github.com/SecurityBrewery/catalyst/database" "github.com/SecurityBrewery/catalyst/generated/api" "github.com/SecurityBrewery/catalyst/generated/model" @@ -135,7 +136,17 @@ func (s *Service) RunArtifact(ctx context.Context, id int64, name string, automa jobID := uuid.NewString() origin := &model.Origin{ArtifactOrigin: &model.ArtifactOrigin{TicketId: id, Artifact: name}} - return s.bus.PublishJob(jobID, automation, map[string]string{"default": name}, &model.Context{Artifact: artifact}, origin) + s.bus.JobChannel.Publish(&bus.JobMsg{ + ID: jobID, + Automation: automation, + Origin: origin, + Message: &model.Message{ + Context: &model.Context{Artifact: artifact}, + Payload: map[string]string{"default": name}, + }, + }) + + return nil } func (s *Service) AddComment(ctx context.Context, i int64, form *model.CommentForm) (doc *model.TicketWithTickets, err error) { diff --git a/start_dev.sh b/start_dev.sh index ae152c3..f99149a 100644 --- a/start_dev.sh +++ b/start_dev.sh @@ -7,8 +7,6 @@ export ARANGO_DB_HOST=http://localhost:8529 export ARANGO_DB_PASSWORD=foobar export S3_HOST=http://localhost:9000 export S3_PASSWORD=minio123 -export EMITTER_IO_HOST=tcp://localhost:9001 -export EMITTER_IO_KEY=A9RysEsPJni8RaHeg_K0FKXQNfBrUyw- export AUTH_BLOCK_NEW=false export AUTH_DEFAULT_ROLES=analyst,admin diff --git a/test/test.go b/test/test.go index c6d4cd2..5ebe3bf 100644 --- a/test/test.go +++ b/test/test.go @@ -45,11 +45,6 @@ func Config(ctx context.Context) (*catalyst.Config, error) { User: "minio", Password: "minio123", }, - Bus: &bus.Config{ - Host: "tcp://localhost:9001", - Key: "A9RysEsPJni8RaHeg_K0FKXQNfBrUyw-", - APIUrl: "http://localhost:8002/api", - }, Secret: []byte("4ef5b29539b70233dd40c02a1799d25079595565e05a193b09da2c3e60ada1cd"), Auth: &catalyst.AuthConfig{ OIDCIssuer: "http://localhost:9002/auth/realms/catalyst", @@ -100,10 +95,7 @@ func Bus(t *testing.T) (context.Context, *catalyst.Config, *bus.Bus, error) { t.Fatal(err) } - catalystBus, err := bus.New(config.Bus) - if err != nil { - t.Fatal(err) - } + catalystBus := bus.New() return ctx, config, catalystBus, err } diff --git a/ui/src/components/icons/ArangoIcon.vue b/ui/src/components/icons/ArangoIcon.vue deleted file mode 100644 index e493148..0000000 --- a/ui/src/components/icons/ArangoIcon.vue +++ /dev/null @@ -1,34 +0,0 @@ - - - - - diff --git a/ui/src/components/icons/EmitterIcon.vue b/ui/src/components/icons/EmitterIcon.vue deleted file mode 100644 index b87c639..0000000 --- a/ui/src/components/icons/EmitterIcon.vue +++ /dev/null @@ -1,31 +0,0 @@ - - - - - diff --git a/ui/src/components/icons/MinioIcon.vue b/ui/src/components/icons/MinioIcon.vue deleted file mode 100644 index 197ea11..0000000 --- a/ui/src/components/icons/MinioIcon.vue +++ /dev/null @@ -1,22 +0,0 @@ - - - - - diff --git a/ui/src/components/icons/NodeRedIcon.vue b/ui/src/components/icons/NodeRedIcon.vue deleted file mode 100644 index 3457ea5..0000000 --- a/ui/src/components/icons/NodeRedIcon.vue +++ /dev/null @@ -1,43 +0,0 @@ - - - - - diff --git a/ui/src/plugins/vuetify.ts b/ui/src/plugins/vuetify.ts index e8a8003..8f6458c 100644 --- a/ui/src/plugins/vuetify.ts +++ b/ui/src/plugins/vuetify.ts @@ -1,10 +1,6 @@ import "@mdi/font/css/materialdesignicons.css"; import Vue from "vue"; import Vuetify from "vuetify/lib"; -import MinioIcon from "../components/icons/MinioIcon.vue"; -import NodeRedIcon from "../components/icons/NodeRedIcon.vue"; -import ArangoIcon from "../components/icons/ArangoIcon.vue"; -import EmitterIcon from "../components/icons/EmitterIcon.vue"; Vue.use(Vuetify); @@ -73,18 +69,6 @@ export default new Vuetify({ }, icons: { values: { - minio: { - component: MinioIcon, - }, - nodered: { - component: NodeRedIcon, - }, - arango: { - component: ArangoIcon, - }, - emitter: { - component: EmitterIcon, - }, }, }, }); diff --git a/ui/src/views/embed/Emitter.vue b/ui/src/views/embed/Emitter.vue deleted file mode 100644 index 8414679..0000000 --- a/ui/src/views/embed/Emitter.vue +++ /dev/null @@ -1,21 +0,0 @@ - - - - - diff --git a/websocket.go b/websocket.go index 0c7f83f..044cdc1 100644 --- a/websocket.go +++ b/websocket.go @@ -3,7 +3,6 @@ package catalyst import ( "encoding/json" "errors" - "log" "net/http" "sync" @@ -49,7 +48,7 @@ func handleWebSocket(catalystBus *bus.Bus) http.HandlerFunc { broker := websocketBroker{clients: map[string]chan []byte{}} // send all messages from bus to websocket - err := catalystBus.SubscribeDatabaseUpdate(func(msg *bus.DatabaseUpdateMsg) { + catalystBus.DatabaseChannel.Subscribe(func(msg *bus.DatabaseUpdateMsg) { b, err := json.Marshal(map[string]any{ "action": "update", "ids": msg.IDs, @@ -60,9 +59,6 @@ func handleWebSocket(catalystBus *bus.Bus) http.HandlerFunc { broker.Publish(b) }) - if err != nil { - log.Println(err) - } return func(w http.ResponseWriter, r *http.Request) { conn, _, _, err := ws.UpgradeHTTP(r, w)