Skip to content
This repository has been archived by the owner on Dec 22, 2020. It is now read-only.

Batch inserts #68

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
PATH
remote: .
specs:
mosql (0.4.0)
mosql (0.5.0)
bson_ext
json
log4r
mongo
mongoriver (= 0.4)
mongoriver (= 0.4.2)
pg
rake
sequel

GEM
remote: https://rubygems.org/
specs:
bson (1.10.2)
bson_ext (1.10.2)
bson (~> 1.10.2)
bson (1.11.1)
bson_ext (1.11.1)
bson (~> 1.11.1)
json (1.8.1)
log4r (1.1.10)
metaclass (0.0.4)
minitest (3.0.0)
mocha (1.0.0)
metaclass (~> 0.0.1)
mongo (1.10.2)
bson (= 1.10.2)
mongoriver (0.4.0)
mongo (1.11.1)
bson (= 1.11.1)
mongoriver (0.4.2)
bson_ext
log4r
mongo (>= 1.7)
Expand Down
63 changes: 56 additions & 7 deletions lib/mosql/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ module MoSQL
class Streamer
include MoSQL::Logging

BATCH = 1000
BATCH_SIZE = 1000
# How long to wait before saving unsaved inserts to postgres
INSERT_BATCH_TIMELIMIT = 5.0

attr_reader :options, :tailer

Expand All @@ -16,7 +18,9 @@ def initialize(opts)
instance_variable_set(:"@#{parm.to_s}", opts[parm])
end

@done = false
# Hash to from namespace -> inserts that need to be made
@batch_insert_lists = Hash.new { |hash, key| hash[key] = [] }
@done = false
end

def stop
Expand Down Expand Up @@ -141,13 +145,13 @@ def import_collection(ns, collection, filter)

start = Time.now
sql_time = 0
collection.find(filter, :batch_size => BATCH) do |cursor|
collection.find(filter, :batch_size => BATCH_SIZE) do |cursor|
with_retries do
cursor.each do |obj|
batch << @schema.transform(ns, obj)
count += 1

if batch.length >= BATCH
if batch.length >= BATCH_SIZE
sql_time += track_time do
bulk_upsert(table, ns, batch)
end
Expand All @@ -170,14 +174,31 @@ def optail
if tail_from.is_a? Time
tail_from = tailer.most_recent_position(tail_from)
end

last_batch_insert = Time.now
tailer.tail(:from => tail_from)
until @done
tailer.stream(1000) do |op|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't done the digging to confirm this, but I'm pretty sure the contract on Tailer by default is that once your block returns, the op is considered to have been handled, and the timestamp may be persisted to postgres. However, with batched inserts, we haven't actually processed the op until we've flushed the inserts, so this could result in data loss if we save a timestamp before flushing the inserts.

mongoriver does have a batch mode, which allows you to explicitly mark batches and tell mongoriver when you're done with a batch. Unfortunately I've forgotten the details, so you'll probably have to source-dive :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!
My original assumption was that if the process is told to stop, it would flush due to the signal handler. However in hindsight you're right - if something catastrophic happens the data would not get flushed resulting in data loss.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we can't assume we'll get to shutdown gracefully -- we need to handle the case where the machine dies, the program gets killed via SIGKILL, whatever.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the author is inactive for long time, I create a branch based on this where I added the @nelhage suggestions: #137
I hope someone can see this.

handle_op(op)
end
time = Time.now
if time - last_batch_insert >= INSERT_BATCH_TIMELIMIT
last_batch_insert = time
do_batch_inserts
end
end

log.info("Finishing, doing last batch inserts.")
do_batch_inserts
end

# Handle $set, $inc and other operators in updates. Done by querying
# mongo and setting the value to whatever mongo holds at the time.
# Note that this somewhat messes with consistency as postgres will be
# "ahead" everything else if tailer is behind.
#
# If no such object is found, try to delete according to primary keys that
# must be present in selector (and not behind $set and etc).
def sync_object(ns, selector)
obj = collection_for_ns(ns).find_one(selector)
if obj
Expand All @@ -196,6 +217,33 @@ def sync_object(ns, selector)
end
end

# Add this op to be batch inserted to namespace
# next time a non-insert happens
def queue_to_batch_insert(op, namespace)
@batch_insert_lists[namespace] << @schema.transform(namespace, op['o'])
if @batch_insert_lists[namespace].length >= BATCH_SIZE
do_batch_inserts(namespace)
end
end

# Do a batch insert for that namespace, putting data to postgres.
# If no namespace is given, all namespaces are done
def do_batch_inserts(namespace=nil)
if namespace.nil?
@batch_insert_lists.keys.each do |ns|
do_batch_inserts(ns)
end
else
to_batch = @batch_insert_lists[namespace]
@batch_insert_lists[namespace] = []
return if to_batch.empty?

table = @sql.table_for_ns(namespace)
log.debug("Batch inserting #{to_batch.length} items to #{table} from #{namespace}.")
bulk_upsert(table, namespace, to_batch)
end
end

def handle_op(op)
log.debug("processing op: #{op.inspect}")
unless op['ns'] && op['op']
Expand Down Expand Up @@ -227,9 +275,7 @@ def handle_op(op)
if collection_name == 'system.indexes'
log.info("Skipping index update: #{op.inspect}")
else
unsafe_handle_exceptions(ns, op['o']) do
@sql.upsert_ns(ns, op['o'])
end
queue_to_batch_insert(op, ns)
end
when 'u'
selector = op['o2']
Expand All @@ -238,6 +284,7 @@ def handle_op(op)
log.debug("resync #{ns}: #{selector['_id']} (update was: #{update.inspect})")
sync_object(ns, selector)
else
do_batch_inserts(ns)

# The update operation replaces the existing object, but
# preserves its _id field, so grab the _id off of the
Expand All @@ -259,6 +306,8 @@ def handle_op(op)
end
end
when 'd'
do_batch_inserts(ns)

if options[:ignore_delete]
log.debug("Ignoring delete op on #{ns} as instructed.")
else
Expand Down
2 changes: 1 addition & 1 deletion lib/mosql/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module MoSQL
VERSION = "0.4.0"
VERSION = "0.5.0"
end
2 changes: 1 addition & 1 deletion mosql.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |gem|

%w[sequel pg mongo bson_ext rake log4r json
].each { |dep| gem.add_runtime_dependency(dep) }
gem.add_runtime_dependency "mongoriver", "0.4"
gem.add_runtime_dependency "mongoriver", "0.4.2"

gem.add_development_dependency "minitest"
gem.add_development_dependency "mocha"
Expand Down
1 change: 1 addition & 0 deletions test/functional/streamer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ def build_streamer
"ns" => "db.has_timestamp",
"o" => mongo['db']['has_timestamp'].find_one({_id: id})
})
@streamer.do_batch_inserts
got = @sequel[:has_timestamp].where(:_id => id.to_s).select.first[:ts]
assert_equal(ts.to_i, got.to_i)
assert_equal(ts.tv_usec, got.tv_usec)
Expand Down
1 change: 1 addition & 0 deletions test/functional/transform.rb
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class MoSQL::Test::Functional::TransformTest < MoSQL::Test::Functional
"ns" => "test.test_transform",
"o" => collection.find_one(_id: id)
})
streamer.do_batch_inserts

got = @sequel[:test_transform].where(_id: id).to_a
assert_equal(sql, got.first[:value], "was able to transform a #{typ} field while streaming")
Expand Down