Remove emitter (#184)

* Remove emitter
This commit is contained in:
Jonas Plum
2022-05-14 01:08:37 +02:00
committed by GitHub
parent 894e607efb
commit dfb501f8b9
31 changed files with 126 additions and 500 deletions

View File

@@ -55,9 +55,7 @@ func (db *BusDatabase) Query(ctx context.Context, query string, vars map[string]
switch {
case operation.Type == bus.DatabaseEntryCreated, operation.Type == bus.DatabaseEntryUpdated:
if err := db.bus.PublishDatabaseUpdate(operation.Ids, operation.Type); err != nil {
return nil, nil, err
}
db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: operation.Ids, Type: operation.Type})
}
return cur, logs, err
@@ -92,10 +90,7 @@ func (c *Collection[T]) CreateDocument(ctx, newctx context.Context, key string,
return meta, err
}
err = c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryCreated)
if err != nil {
return meta, err
}
c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryCreated})
return meta, nil
}
@@ -108,10 +103,7 @@ func (c *Collection[T]) CreateEdge(ctx, newctx context.Context, edge *driver.Edg
return meta, err
}
err = c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryCreated)
if err != nil {
return meta, err
}
c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryCreated})
return meta, nil
}
@@ -132,10 +124,7 @@ func (c *Collection[T]) CreateEdges(ctx context.Context, edges []*driver.EdgeDoc
ids = append(ids, meta.ID)
}
err = c.db.bus.PublishDatabaseUpdate(ids, bus.DatabaseEntryCreated)
if err != nil {
return metas, err
}
c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: ids, Type: bus.DatabaseEntryCreated})
return metas, nil
}
@@ -162,7 +151,9 @@ func (c *Collection[T]) UpdateDocument(ctx context.Context, key string, update a
return meta, err
}
return meta, c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryUpdated)
c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryUpdated})
return meta, nil
}
func (c *Collection[T]) ReplaceDocument(ctx context.Context, key string, document *T) (meta driver.DocumentMeta, err error) {
@@ -173,7 +164,9 @@ func (c *Collection[T]) ReplaceDocument(ctx context.Context, key string, documen
return meta, err
}
return meta, c.db.bus.PublishDatabaseUpdate([]driver.DocumentID{meta.ID}, bus.DatabaseEntryUpdated)
c.db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{IDs: []driver.DocumentID{meta.ID}, Type: bus.DatabaseEntryUpdated})
return meta, nil
}
func (c *Collection[T]) RemoveDocument(ctx context.Context, formatInt string) (meta driver.DocumentMeta, err error) {

View File

@@ -3,7 +3,6 @@ package busdb
import (
"context"
"errors"
"log"
"strings"
"github.com/arangodb/go-driver"
@@ -46,12 +45,10 @@ func (db *BusDatabase) LogBatchCreate(ctx context.Context, logentries []*model.L
}
}
if ids != nil {
go func() {
err := db.bus.PublishDatabaseUpdate(ids, bus.DatabaseEntryCreated)
if err != nil {
log.Println(err)
}
}()
go db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{
IDs: ids,
Type: bus.DatabaseEntryCreated,
})
}
_, errs, err := db.logCollection.CreateDocuments(ctx, logentries)

View File

@@ -186,11 +186,17 @@ func publishJobMapping(id, automation string, contextStructs *model.Context, ori
return fmt.Errorf("message generation failed: %w", err)
}
return publishJob(id, automation, contextStructs, origin, msg, db)
}
db.bus.JobChannel.Publish(&bus.JobMsg{
ID: id,
Automation: automation,
Origin: origin,
Message: &model.Message{
Context: contextStructs,
Payload: msg,
},
})
func publishJob(id, automation string, contextStructs *model.Context, origin *model.Origin, payload map[string]any, db *Database) error {
return db.bus.PublishJob(id, automation, payload, contextStructs, origin)
return nil
}
func generatePayload(msgMapping map[string]string, contextStructs *model.Context) (map[string]any, error) {

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"sort"
"strconv"
"strings"
@@ -252,11 +251,10 @@ func (db *Database) TicketBatchCreate(ctx context.Context, ticketForms []*model.
ids = append(ids, driver.NewDocumentID(TicketCollectionName, fmt.Sprint(apiTicket.ID)))
}
go func() {
if err := db.bus.PublishDatabaseUpdate(ids, bus.DatabaseEntryUpdated); err != nil {
log.Println(err)
}
}()
db.bus.DatabaseChannel.Publish(&bus.DatabaseUpdateMsg{
IDs: ids,
Type: bus.DatabaseEntryCreated,
})
ticketResponses, err := toTicketResponses(apiTickets)
if err != nil {