Skip to content

Commit

Permalink
Merge pull request #11 from pankajsoni19/feature/traffic-split
Browse files Browse the repository at this point in the history
Switch balancer to smoothed round robin, previous was inconsistent
  • Loading branch information
pankajsoni19 authored Jan 19, 2025
2 parents be07e91 + 04e5b81 commit 7dcb15b
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 76 deletions.
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func main() {

// Initialize the default SMTP (`email`) messenger.
for _, m := range initSMTPMessengers() {
app.messengers[m.Name()] = m
app.messengers[m.UUID()] = m
if m.IsDefault() {
app.defaultMessenger = m
}
Expand All @@ -244,7 +244,7 @@ func main() {

// Initialize any additional postback messengers.
for _, m := range initPostbackMessengers() {
app.messengers[m.Name()] = m
app.messengers[m.UUID()] = m
}

// Attach all messengers to the campaign manager.
Expand Down
2 changes: 1 addition & 1 deletion cmd/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (app *App) sendNotification(toEmails []string, subject, tplName string, dat
m.To = toEmails
m.Subject = subject
m.Body = body
m.Messenger = app.defaultMessenger.Name()
m.Messenger = app.defaultMessenger.UUID()
if err := app.manager.PushMessage(m); err != nil {
app.log.Printf("error sending admin notification (%s): %v", subject, err)
return err
Expand Down
10 changes: 9 additions & 1 deletion cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,21 @@ func handleSendTxMessage(c echo.Context) error {
app.i18n.Ts("globals.messages.errorFetching", "name"))
}

var muid string
for _, v := range app.messengers {
if m.Messenger == v.Name() || m.Messenger == v.UUID() {
muid = v.UUID()
break
}
}

// Prepare the final message.
msg := models.Message{}
msg.Subscriber = sub
msg.To = []string{sub.Email}
msg.Subject = m.Subject
msg.ContentType = m.ContentType
msg.Messenger = m.Messenger
msg.Messenger = muid
msg.Body = m.Body
for _, a := range m.Attachments {
msg.Attachments = append(msg.Attachments, models.Attachment{
Expand Down
12 changes: 6 additions & 6 deletions frontend/src/views/Campaign.vue
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,10 @@
<div style="display: flex; flex-direction: column;">
<div v-for="(item, index) in messengers" :key="index" style="display: flex; flex-direction: row; margin-top: 24px;">
<div style="min-width: 192px;align-content: center;">
<b-checkbox v-model="selectedStates[item.name]" :disabled="!canEditWindow">{{ item.name }}</b-checkbox>
<b-checkbox v-model="selectedStates[item.uuid]" :disabled="!canEditWindow">{{ item.name }}</b-checkbox>
</div>
<b-field label="Weight" label-position="on-border" :hidden="form.trafficType == 'duplicate'">
<b-input :maxlength="2" :max="10" :min="1" placeholder="Weight..." style="width: 108px;" v-model="itemValues[item.name]" :disabled="!canEditWindow" />
<b-input :maxlength="2" :max="10" :min="1" placeholder="Weight..." style="width: 108px;" v-model="itemValues[item.uuid]" :disabled="!canEditWindow" />
</b-field>
</div>
</div>
Expand Down Expand Up @@ -469,13 +469,13 @@ export default Vue.extend({
this.form.archiveMetaStr = this.$utils.getPref('campaign.archiveMetaStr') || JSON.stringify(JSON.parse(archiveStr), null, 4);
},
selectedMessengers() {
const filtered = this.messengers.filter((item) => this.selectedStates[item.name]);
const filtered = this.messengers.filter((item) => this.selectedStates[item.uuid]);
return filtered.map((item) => {
const mrow = {
uuid: item.uuid,
name: item.name,
weight: parseInt(this.itemValues[item.name], 10) || 1,
weight: parseInt(this.itemValues[item.uuid], 10) || 1,
};
return mrow;
Expand Down Expand Up @@ -549,8 +549,8 @@ export default Vue.extend({
console.log('init', JSON.stringify(this.selectedStates), JSON.stringify(this.itemValues));

Check warning on line 549 in frontend/src/views/Campaign.vue

View workflow job for this annotation

GitHub Actions / build

Unexpected console statement
JSON.parse(data.messenger).forEach((m) => {
this.selectedStates[m.name] = true;
this.itemValues[m.name] = m.weight;
this.selectedStates[m.uuid] = true;
this.itemValues[m.uuid] = m.weight;
});
console.log('updated', JSON.stringify(this.selectedStates), JSON.stringify(this.itemValues));

Check warning on line 556 in frontend/src/views/Campaign.vue

View workflow job for this annotation

GitHub Actions / build

Unexpected console statement
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ require (
github.com/knadh/stuffbin v1.1.0
github.com/labstack/echo/v4 v4.11.4
github.com/lib/pq v1.10.9
github.com/mroth/weightedrand/v2 v2.1.0
github.com/paulbellamy/ratecounter v0.2.0
github.com/rhnvrm/simples3 v0.8.3
github.com/spf13/pflag v1.0.5
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mroth/weightedrand/v2 v2.1.0 h1:o1ascnB1CIVzsqlfArQQjeMy1U0NcIbBO5rfd5E/OeU=
github.com/mroth/weightedrand/v2 v2.1.0/go.mod h1:f2faGsfOGOwc1p94wzHKKZyTpcJUW7OJ/9U4yfiNAOU=
github.com/paulbellamy/ratecounter v0.2.0 h1:2L/RhJq+HA8gBQImDXtLPrDXK5qAj6ozWVK/zFXVJGs=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
Expand Down
104 changes: 104 additions & 0 deletions internal/balancer/balancer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package balancer

// Code is from https://github.com/mr-karan/balance
// Updated for our usecase

import (
"errors"
"sync"
)

// ErrDuplicateID error is thrown when attempt to add an ID
// which is already added to the balancer.
var ErrDuplicateID = errors.New("entry already added")

// Item represents the item in the list.
type item struct {
// id is the id of the item.
id string
// weight is the weight of the item that is given by the user.
weight int
// current is the current weight of the item.
current int
}

// Balance represents a smooth weighted round-robin load balancer.
type Balance struct {
// items is the list of items to balance
items []*item
// next is the index of the next item to use.
next *item

mutex *sync.RWMutex

ids []string
}

// NewBalance creates a new load balancer.
func NewBalance() *Balance {
return &Balance{
items: make([]*item, 0),
mutex: &sync.RWMutex{},
}
}

// Allow chaining
func (b *Balance) Add(id string, weight int) *Balance {
b.mutex.Lock()
defer b.mutex.Unlock()
for _, v := range b.items {
if v.id == id {
return b
}
}

b.items = append(b.items, &item{
id: id,
weight: weight,
current: 0,
})

b.ids = append(b.ids, id)

return b
}

func (b *Balance) All() []string {
return b.ids
}

func (b *Balance) Get() string {
b.mutex.Lock()
defer b.mutex.Unlock()

if len(b.items) == 0 {
return ""
}

if len(b.items) == 1 {
return b.items[0].id
}

// Total weight of all items.
var total int

// Loop through the list of items and add the item's weight to the current weight.
// Also increment the total weight counter.
var max *item
for _, item := range b.items {
item.current += item.weight
total += item.weight

// Select the item with max weight.
if max == nil || item.current > max.current {
max = item
}
}

// Select the item with the max weight.
b.next = max
// Reduce the current weight of the selected item by the total weight.
max.current -= total

return max.id
}
38 changes: 16 additions & 22 deletions internal/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
"time"

"github.com/Masterminds/sprig/v3"
"github.com/knadh/listmonk/internal/balancer"
"github.com/knadh/listmonk/internal/i18n"
"github.com/knadh/listmonk/models"
"github.com/mroth/weightedrand/v2"
"golang.org/x/text/cases"
"golang.org/x/text/language"
)
Expand Down Expand Up @@ -179,7 +179,7 @@ func New(cfg Config, store Store, notifCB models.AdminNotifCallback, i *i18n.I18

// AddMessenger adds a Messenger messaging backend to the manager.
func (m *Manager) AddMessenger(msg Messenger) error {
id := msg.Name()
id := msg.UUID()
if _, ok := m.messengers[id]; ok {
return fmt.Errorf("messenger '%s' is already loaded", id)
}
Expand All @@ -203,34 +203,28 @@ func (m *Manager) PushMessage(msg models.Message) error {
}

func (m *Manager) parseCampMessengers(c *models.Campaign) (
[]string, *weightedrand.Chooser[string, int], error,
*balancer.Balance, error,
) {
choices := make([]weightedrand.Choice[string, int], 0)

weightedMessengers := make([]*models.CampaignMessenger, 0)
messengers := make([]string, 0)

if e := json.Unmarshal([]byte(c.Messenger), &weightedMessengers); e != nil {
m.store.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled)
return nil, nil, fmt.Errorf("unknown messenger %s on campaign %s", c.Messenger, c.Name)
return nil, fmt.Errorf("unknown messenger %s on campaign %s", c.Messenger, c.Name)
}

balancer := balancer.NewBalance()

for _, wmessenger := range weightedMessengers {
if _, ok := m.messengers[wmessenger.Name]; !ok {
if _, ok := m.messengers[wmessenger.UUID]; !ok {
m.store.UpdateCampaignStatus(c.ID, models.CampaignStatusCancelled)
return nil, nil, fmt.Errorf("unknown messenger %s on campaign %s", wmessenger.Name, c.Name)
return nil, fmt.Errorf("unknown messenger %s on campaign %s", wmessenger.Name, c.Name)
}

choices = append(choices, weightedrand.Choice[string, int]{
Item: wmessenger.Name,
Weight: wmessenger.Weight,
})

messengers = append(messengers, wmessenger.Name)
balancer.Add(wmessenger.UUID, wmessenger.Weight)
}

chooser, _ := weightedrand.NewChooser(choices...)

return messengers, chooser, nil
return balancer, nil
}

// PushCampaignMessage pushes a campaign messages into a queue to be sent out by the workers.
Expand All @@ -241,17 +235,17 @@ func (m *Manager) PushCampaignMessage(msg CampaignMessage) error {
return err
}

messengers, chooser, err := m.parseCampMessengers(msg.Campaign)
balancer, err := m.parseCampMessengers(msg.Campaign)

if err != nil {
return err
}

if msg.Campaign.TrafficType == "split" {
msg.messenger = chooser.Pick()
if msg.Campaign.TrafficType == models.CampaignTrafficTypeSplit {
msg.messenger = balancer.Get()
m.campMsgQ <- msg
} else {
for _, msgnr := range messengers {
for _, msgnr := range balancer.All() {
msg.messenger = msgnr
m.campMsgQ <- msg
}
Expand Down Expand Up @@ -487,7 +481,7 @@ func (m *Manager) scanCampaigns(tick time.Duration) {
func (m *Manager) QueueForSubAndList(subIDs, listIDs []int) {
campaigns, e := m.store.GetCampaignsForListsRunType(listIDs, "event:sub")
if e != nil {
fmt.Println(e)
m.log.Printf("error QueueForSubAndList: event:sub (%v): %v", listIDs, e)
return
}

Expand Down
18 changes: 8 additions & 10 deletions internal/manager/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"sync/atomic"
"time"

"github.com/knadh/listmonk/internal/balancer"
"github.com/knadh/listmonk/models"
"github.com/mroth/weightedrand/v2"
"github.com/paulbellamy/ratecounter"
)

Expand All @@ -25,8 +25,7 @@ type pipe struct {
slidingStart time.Time
SlidingWindowDuration time.Duration
slidingCount int
chooser *weightedrand.Chooser[string, int]
messengers []string
balancer *balancer.Balance
totalMessengers int
}

Expand All @@ -44,7 +43,7 @@ func (m *Manager) newPipe(c *models.Campaign) (*pipe, error) {

dur, _ := time.ParseDuration(c.SlidingWindowDuration)

messengers, chooser, err := m.parseCampMessengers(c)
balancer, err := m.parseCampMessengers(c)

if err != nil {
return nil, err
Expand All @@ -59,9 +58,8 @@ func (m *Manager) newPipe(c *models.Campaign) (*pipe, error) {
slidingStart: time.Now(),
SlidingWindowDuration: dur,
slidingCount: 0,
chooser: chooser,
messengers: messengers,
totalMessengers: len(messengers),
balancer: balancer,
totalMessengers: len(balancer.All()),
}

// Increment the waitgroup so that Wait() blocks immediately. This is necessary
Expand Down Expand Up @@ -128,14 +126,14 @@ func (p *pipe) NextSubscribers(result chan *NextSubResult) {

// Push the message to the queue while blocking and waiting until
// the queue is drained.
if p.camp.TrafficType == "split" {
if p.camp.TrafficType == models.CampaignTrafficTypeSplit {
// decide messanger and queue message
msg.messenger = p.chooser.Pick()
msg.messenger = p.balancer.Get()
msg.hasMore = false
p.m.campMsgQ <- msg
} else {
// duplicate type, send to all messengers
for idx, messenger := range p.messengers {
for idx, messenger := range p.balancer.All() {
msg.messenger = messenger
msg.hasMore = (idx < (p.totalMessengers - 1))
p.m.campMsgQ <- msg
Expand Down
Loading

0 comments on commit 7dcb15b

Please sign in to comment.