diff --git a/cmd/main.go b/cmd/main.go index ec19d50a7..95098ec0e 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -229,7 +229,7 @@ func main() { // Initialize the default SMTP (`email`) messenger. for _, m := range initSMTPMessengers() { - app.messengers[m.Name()] = m + app.messengers[m.UUID()] = m if m.IsDefault() { app.defaultMessenger = m } @@ -244,7 +244,7 @@ func main() { // Initialize any additional postback messengers. for _, m := range initPostbackMessengers() { - app.messengers[m.Name()] = m + app.messengers[m.UUID()] = m } // Attach all messengers to the campaign manager. diff --git a/cmd/notifications.go b/cmd/notifications.go index 7a0988472..3d3d8ba09 100644 --- a/cmd/notifications.go +++ b/cmd/notifications.go @@ -46,7 +46,7 @@ func (app *App) sendNotification(toEmails []string, subject, tplName string, dat m.To = toEmails m.Subject = subject m.Body = body - m.Messenger = app.defaultMessenger.Name() + m.Messenger = app.defaultMessenger.UUID() if err := app.manager.PushMessage(m); err != nil { app.log.Printf("error sending admin notification (%s): %v", subject, err) return err diff --git a/cmd/tx.go b/cmd/tx.go index 2a2276ee4..6ddb7bfa0 100644 --- a/cmd/tx.go +++ b/cmd/tx.go @@ -120,13 +120,21 @@ func handleSendTxMessage(c echo.Context) error { app.i18n.Ts("globals.messages.errorFetching", "name")) } + var muid string + for _, v := range app.messengers { + if m.Messenger == v.Name() || m.Messenger == v.UUID() { + muid = v.UUID() + break + } + } + // Prepare the final message. msg := models.Message{} msg.Subscriber = sub msg.To = []string{sub.Email} msg.Subject = m.Subject msg.ContentType = m.ContentType - msg.Messenger = m.Messenger + msg.Messenger = muid msg.Body = m.Body for _, a := range m.Attachments { msg.Attachments = append(msg.Attachments, models.Attachment{ diff --git a/frontend/src/views/Campaign.vue b/frontend/src/views/Campaign.vue index d7634c92a..4094cff0d 100644 --- a/frontend/src/views/Campaign.vue +++ b/frontend/src/views/Campaign.vue @@ -106,10 +106,10 @@
- {{ item.name }} + {{ item.name }}
- +
@@ -469,13 +469,13 @@ export default Vue.extend({ this.form.archiveMetaStr = this.$utils.getPref('campaign.archiveMetaStr') || JSON.stringify(JSON.parse(archiveStr), null, 4); }, selectedMessengers() { - const filtered = this.messengers.filter((item) => this.selectedStates[item.name]); + const filtered = this.messengers.filter((item) => this.selectedStates[item.uuid]); return filtered.map((item) => { const mrow = { uuid: item.uuid, name: item.name, - weight: parseInt(this.itemValues[item.name], 10) || 1, + weight: parseInt(this.itemValues[item.uuid], 10) || 1, }; return mrow; @@ -549,8 +549,8 @@ export default Vue.extend({ console.log('init', JSON.stringify(this.selectedStates), JSON.stringify(this.itemValues)); JSON.parse(data.messenger).forEach((m) => { - this.selectedStates[m.name] = true; - this.itemValues[m.name] = m.weight; + this.selectedStates[m.uuid] = true; + this.itemValues[m.uuid] = m.weight; }); console.log('updated', JSON.stringify(this.selectedStates), JSON.stringify(this.itemValues)); diff --git a/go.mod b/go.mod index e88ebaf50..7701e15da 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( github.com/knadh/stuffbin v1.1.0 github.com/labstack/echo/v4 v4.11.4 github.com/lib/pq v1.10.9 - github.com/mroth/weightedrand/v2 v2.1.0 github.com/paulbellamy/ratecounter v0.2.0 github.com/rhnvrm/simples3 v0.8.3 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index 5b4296fd6..3a1facc2a 100644 --- a/go.sum +++ b/go.sum @@ -108,8 +108,6 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ= github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= -github.com/mroth/weightedrand/v2 v2.1.0 h1:o1ascnB1CIVzsqlfArQQjeMy1U0NcIbBO5rfd5E/OeU= -github.com/mroth/weightedrand/v2 v2.1.0/go.mod h1:f2faGsfOGOwc1p94wzHKKZyTpcJUW7OJ/9U4yfiNAOU= github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs= github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= diff --git a/internal/balancer/balancer.go b/internal/balancer/balancer.go new file mode 100644 index 000000000..c316d2503 --- /dev/null +++ b/internal/balancer/balancer.go @@ -0,0 +1,104 @@ +package balancer + +// Code is from https://github.com/mr-karan/balance +// Updated for our usecase + +import ( + "errors" + "sync" +) + +// ErrDuplicateID error is thrown when attempt to add an ID +// which is already added to the balancer. +var ErrDuplicateID = errors.New("entry already added") + +// Item represents the item in the list. +type item struct { + // id is the id of the item. + id string + // weight is the weight of the item that is given by the user. + weight int + // current is the current weight of the item. + current int +} + +// Balance represents a smooth weighted round-robin load balancer. +type Balance struct { + // items is the list of items to balance + items []*item + // next is the index of the next item to use. + next *item + + mutex *sync.RWMutex + + ids []string +} + +// NewBalance creates a new load balancer. +func NewBalance() *Balance { + return &Balance{ + items: make([]*item, 0), + mutex: &sync.RWMutex{}, + } +} + +// Allow chaining +func (b *Balance) Add(id string, weight int) *Balance { + b.mutex.Lock() + defer b.mutex.Unlock() + for _, v := range b.items { + if v.id == id { + return b + } + } + + b.items = append(b.items, &item{ + id: id, + weight: weight, + current: 0, + }) + + b.ids = append(b.ids, id) + + return b +} + +func (b *Balance) All() []string { + return b.ids +} + +func (b *Balance) Get() string { + b.mutex.Lock() + defer b.mutex.Unlock() + + if len(b.items) == 0 { + return "" + } + + if len(b.items) == 1 { + return b.items[0].id + } + + // Total weight of all items. + var total int + + // Loop through the list of items and add the item's weight to the current weight. + // Also increment the total weight counter. + var max *item + for _, item := range b.items { + item.current += item.weight + total += item.weight + + // Select the item with max weight. + if max == nil || item.current > max.current { + max = item + } + } + + // Select the item with the max weight. + b.next = max + // Reduce the current weight of the selected item by the total weight. + max.current -= total + + return max.id +} diff --git a/internal/manager/manager.go b/internal/manager/manager.go index c08f2a9d7..eb8fbdacb 100644 --- a/internal/manager/manager.go +++ b/internal/manager/manager.go @@ -12,9 +12,9 @@ import ( "time" "github.com/Masterminds/sprig/v3" + "github.com/knadh/listmonk/internal/balancer" "github.com/knadh/listmonk/internal/i18n" "github.com/knadh/listmonk/models" - "github.com/mroth/weightedrand/v2" "golang.org/x/text/cases" "golang.org/x/text/language" ) @@ -179,7 +179,7 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18 // AddMessenger adds a Messenger messaging backend to the manager. func (m *Manager) AddMessenger(msg Messenger) error { - id := msg.Name() + id := msg.UUID() if _, ok := m.messengers[id]; ok { return fmt.Errorf("messenger '%s' is already loaded", id) } @@ -203,34 +203,28 @@ func (m *Manager) PushMessage(msg models.Message) error { } func (m *Manager) parseCampMessengers(c *models.Campaign) ( - []string, *weightedrand.Chooser[string, int], error, + *balancer.Balance, error, ) { - choices := make([]weightedrand.Choice[string, int], 0) + weightedMessengers := make([]*models.CampaignMessenger, 0) - messengers := make([]string, 0) if e := json.Unmarshal([]byte(c.Messenger), &weightedMessengers); e != nil { m.store.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled) - return nil, nil, fmt.Errorf("unknown messenger %s on campaign %s", c.Messenger, c.Name) + return nil, fmt.Errorf("unknown messenger %s on campaign %s", c.Messenger, c.Name) } + balancer := balancer.NewBalance() + for _, wmessenger := range weightedMessengers { - if _, ok := m.messengers[wmessenger.Name]; !ok { + if _, ok := m.messengers[wmessenger.UUID]; !ok { m.store.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled) - return nil, nil, fmt.Errorf("unknown messenger %s on campaign %s", wmessenger.Name, c.Name) + return nil, fmt.Errorf("unknown messenger %s on campaign %s", wmessenger.Name, c.Name) } - choices = append(choices, weightedrand.Choice[string, int]{ - Item: wmessenger.Name, - Weight: wmessenger.Weight, - }) - - messengers = append(messengers, wmessenger.Name) + balancer.Add(wmessenger.UUID, wmessenger.Weight) } - chooser, _ := weightedrand.NewChooser(choices...) - - return messengers, chooser, nil + return balancer, nil } // PushCampaignMessage pushes a campaign messages into a queue to be sent out by the workers. @@ -241,17 +235,17 @@ func (m *Manager) PushCampaignMessage(msg CampaignMessage) error { return err } - messengers, chooser, err := m.parseCampMessengers(msg.Campaign) + balancer, err := m.parseCampMessengers(msg.Campaign) if err != nil { return err } - if msg.Campaign.TrafficType == "split" { - msg.messenger = chooser.Pick() + if msg.Campaign.TrafficType == models.CampaignTrafficTypeSplit { + msg.messenger = balancer.Get() m.campMsgQ <- msg } else { - for _, msgnr := range messengers { + for _, msgnr := range balancer.All() { msg.messenger = msgnr m.campMsgQ <- msg } @@ -487,7 +481,7 @@ func (m *Manager) scanCampaigns(tick time.Duration) { func (m *Manager) QueueForSubAndList(subIDs, listIDs []int) { campaigns, e := m.store.GetCampaignsForListsRunType(listIDs, "event:sub") if e != nil { - fmt.Println(e) + m.log.Printf("error QueueForSubAndList: event:sub (%v): %v", listIDs, e) return } diff --git a/internal/manager/pipe.go b/internal/manager/pipe.go index 895cb57be..d81f345b6 100644 --- a/internal/manager/pipe.go +++ b/internal/manager/pipe.go @@ -6,8 +6,8 @@ import ( "sync/atomic" "time" + "github.com/knadh/listmonk/internal/balancer" "github.com/knadh/listmonk/models" - "github.com/mroth/weightedrand/v2" "github.com/paulbellamy/ratecounter" ) @@ -25,8 +25,7 @@ type pipe struct { slidingStart time.Time SlidingWindowDuration time.Duration slidingCount int - chooser *weightedrand.Chooser[string, int] - messengers []string + balancer *balancer.Balance totalMessengers int } @@ -44,7 +43,7 @@ func (m *Manager) newPipe(c *models.Campaign) (*pipe, error) { dur, _ := time.ParseDuration(c.SlidingWindowDuration) - messengers, chooser, err := m.parseCampMessengers(c) + balancer, err := m.parseCampMessengers(c) if err != nil { return nil, err @@ -59,9 +58,8 @@ func (m *Manager) newPipe(c *models.Campaign) (*pipe, error) { slidingStart: time.Now(), SlidingWindowDuration: dur, slidingCount: 0, - chooser: chooser, - messengers: messengers, - totalMessengers: len(messengers), + balancer: balancer, + totalMessengers: len(balancer.All()), } // Increment the waitgroup so that Wait() blocks immediately. This is necessary @@ -128,14 +126,14 @@ func (p *pipe) NextSubscribers(result chan *NextSubResult) { // Push the message to the queue while blocking and waiting until // the queue is drained. - if p.camp.TrafficType == "split" { + if p.camp.TrafficType == models.CampaignTrafficTypeSplit { // decide messanger and queue message - msg.messenger = p.chooser.Pick() + msg.messenger = p.balancer.Get() msg.hasMore = false p.m.campMsgQ <- msg } else { // duplicate type, send to all messengers - for idx, messenger := range p.messengers { + for idx, messenger := range p.balancer.All() { msg.messenger = messenger msg.hasMore = (idx < (p.totalMessengers - 1)) p.m.campMsgQ <- msg diff --git a/internal/messenger/email/email.go b/internal/messenger/email/email.go index 9172b5bc2..7048e375d 100644 --- a/internal/messenger/email/email.go +++ b/internal/messenger/email/email.go @@ -8,9 +8,9 @@ import ( "strconv" "strings" + "github.com/knadh/listmonk/internal/balancer" "github.com/knadh/listmonk/models" "github.com/knadh/smtppool" - "github.com/mroth/weightedrand/v2" ) const ( @@ -19,7 +19,7 @@ const ( hdrCc = "Cc" ) -func (s Server) makeChooser() *weightedrand.Chooser[string, int] { +func (s Server) makeBalancer() *balancer.Balance { parts := strings.Split(s.WFrom, ",") var lastKey string @@ -44,17 +44,13 @@ func (s Server) makeChooser() *weightedrand.Chooser[string, int] { lastKey = spart } - choices := make([]weightedrand.Choice[string, int], 0) + balancer := balancer.NewBalance() + for k, v := range choiceVal { - choices = append(choices, weightedrand.Choice[string, int]{ - Item: k, - Weight: v, - }) + balancer.Add(k, v) } - chooser, _ := weightedrand.NewChooser(choices...) - - return chooser + return balancer } // Server represents an SMTP server's credentials. @@ -79,8 +75,8 @@ type Server struct { // Emailer is the SMTP e-mail messenger. type Emailer struct { - server *Server - chooser *weightedrand.Chooser[string, int] + server *Server + balancer *balancer.Balance } // New returns an SMTP e-mail Messenger backend with the given SMTP servers. @@ -120,8 +116,8 @@ func New(s Server) (*Emailer, error) { s.pool = pool e := &Emailer{ - server: &s, - chooser: s.makeChooser(), + server: &s, + balancer: s.makeBalancer(), } return e, nil @@ -162,7 +158,7 @@ func (e *Emailer) Push(m models.Message) error { } em := smtppool.Email{ - From: e.chooser.Pick(), + From: e.balancer.Get(), To: m.To, Subject: m.Subject, Attachments: files, diff --git a/internal/messenger/postback/postback.go b/internal/messenger/postback/postback.go index cb6b7c531..76246c23b 100644 --- a/internal/messenger/postback/postback.go +++ b/internal/messenger/postback/postback.go @@ -12,8 +12,8 @@ import ( "strings" "time" + "github.com/knadh/listmonk/internal/balancer" "github.com/knadh/listmonk/models" - "github.com/mroth/weightedrand/v2" ) // postback is the payload that's posted as JSON to the HTTP Postback server. @@ -66,13 +66,14 @@ type Options struct { // Postback represents an HTTP Message server. type Postback struct { - authStr string - o Options - c *http.Client - chooser *weightedrand.Chooser[string, int] + authStr string + o Options + c *http.Client + balancer *balancer.Balance } -func (o Options) makeChooser() *weightedrand.Chooser[string, int] { +func (o Options) makeBalancer() *balancer.Balance { + // copied from email parts := strings.Split(o.WFrom, ",") var lastKey string @@ -97,17 +98,13 @@ func (o Options) makeChooser() *weightedrand.Chooser[string, int] { lastKey = spart } - choices := make([]weightedrand.Choice[string, int], 0) + balancer := balancer.NewBalance() + for k, v := range choiceVal { - choices = append(choices, weightedrand.Choice[string, int]{ - Item: k, - Weight: v, - }) + balancer.Add(k, v) } - chooser, _ := weightedrand.NewChooser(choices...) - - return chooser + return balancer } // New returns a new instance of the HTTP Postback messenger. @@ -130,7 +127,7 @@ func New(o Options) (*Postback, error) { IdleConnTimeout: o.Timeout, }, }, - chooser: o.makeChooser(), + balancer: o.makeBalancer(), }, nil } @@ -149,7 +146,7 @@ func (p *Postback) IsDefault() bool { // Push pushes a message to the server. func (p *Postback) Push(m models.Message) error { - from := p.chooser.Pick() + from := p.balancer.Get() pb := postback{ FromEmail: from, diff --git a/models/models.go b/models/models.go index aaea62c53..6d8fdac00 100644 --- a/models/models.go +++ b/models/models.go @@ -49,6 +49,9 @@ const ( CampaignContentTypeMarkdown = "markdown" CampaignContentTypePlain = "plain" + CampaignTrafficTypeSplit = "split" + CampaignTrafficTypeDuplicate = "duplicate" + // List. ListTypePrivate = "private" ListTypePublic = "public"