Skip to content

Commit

Permalink
Support more events at the same time
Browse files Browse the repository at this point in the history
Doing persisting to ES and checking the DB for notifications and
processing them took a lot of time, and the previous setup meant that a
lot of events got dropped.

By using anotehr channel to push tasks onto, and starting 4 goroutines
that will take work off there to process, it's been able to handle
everything I throw at it. The buffered channel is key here.
  • Loading branch information
bittersweet committed Oct 11, 2015
1 parent 6b6982a commit 767bba6
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 40 deletions.
4 changes: 2 additions & 2 deletions elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func Persist(name string, data map[string]interface{}) error {
Data: data,
}
body, _ := json.Marshal(payload)
resp, err := http.Post("http://localhost:9200/notifilter/event/?pretty", "application/json", bytes.NewReader(body))
resp, err := http.Post("http://127.0.0.1:9200/notifilter/event/?pretty", "application/json", bytes.NewReader(body))
if err != nil {
return err
}
Expand All @@ -48,7 +48,7 @@ func EventCount() (int, error) {
} `json:"hits"`
}

resp, err := http.Get("http://localhost:9200/notifilter/event/_search?size=0")
resp, err := http.Get("http://127.0.0.1:9200/notifilter/event/_search?size=0")
if err != nil {
return 0, err
}
Expand Down
25 changes: 23 additions & 2 deletions notifilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"net/http"
"runtime"
"sync"

"github.com/bittersweet/notifilter/elasticsearch"
"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -60,6 +61,26 @@ func (e *Event) notify() {
func incomingItems() chan<- []byte {
incomingChan := make(chan []byte)

// Open a channel with a capacity of 10.000 events
// This will only block the sender if the buffer fills up.
// If we do not buffer any event that gets sent to the channel will be
// dropped if we can not handle it.
tasks := make(chan Event, 10000)

// Use 4 workers that will concurrently grab Events of the channel and
// persist+notify
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
for event := range tasks {
event.persist()
event.notify()
}
wg.Done()
}()
}

go func() {
for {
select {
Expand All @@ -70,10 +91,10 @@ func incomingItems() chan<- []byte {
log.Println(err)
log.Printf("%+v\n", Event)
}
Event.persist()
Event.notify()
tasks <- Event
}
}
close(tasks)
}()

fmt.Println("incomingItems launched")
Expand Down
10 changes: 6 additions & 4 deletions stats.rb → scripts/generate_test_data.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ def socket

def track(message)
@backlog << message
# uncomment to enable buffering messages per 20
# if @backlog.size >= @batch_size
flush
# flush
# end
flush
end

def flush
Expand All @@ -33,9 +35,9 @@ def flush
10.times do
jobs << Thread.new do
s = Stats.new
10.times do |i|
message = {mark: 'is cool!', number: rand(10)}
s.track({'key' => 'mark', 'value' => message})
1000.times do |i|
data = { user_id: rand(10), created_at: Time.now }
s.track({'identifier' => 'signup', 'data' => data})
end
end
end
Expand Down
45 changes: 45 additions & 0 deletions scripts/test_data_2.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
require 'socket'
require 'json'

class Stats
def initialize
@batch_size = 20
@backlog = []
end

def socket
Thread.current[:statsd_socket] ||= UDPSocket.new
end

def track(message)
@backlog << message
# uncomment to enable buffering messages per 20
# if @backlog.size >= @batch_size
# flush
# end
flush
end

def flush
@backlog.each do |item|
message = item.to_json
# puts message
puts socket.send(message, 0, "127.0.0.1", 8000)
end
@backlog.clear
end
end

socket = UDPSocket.new
jobs = []
25.times do
jobs << Thread.new do
s = Stats.new
100.times do |i|
data = { user_id: rand(10), created_at: Time.now }
s.track({'identifier' => 'signup', 'data' => data})
end
end
end

jobs.map(&:join)
32 changes: 0 additions & 32 deletions single.rb

This file was deleted.

0 comments on commit 767bba6

Please sign in to comment.