Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/implement different actions #74

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-isodate>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-retry_delay>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-uri>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-action>> |<<string,string>>|Yes
| <<plugins-{type}s-{plugin}-query_key>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-query_value>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-upsert>> |<<boolean,boolean>>|No
|=======================================================================

Also see <<plugins-{type}s-{plugin}-common-options>> for a list of options supported by all
Expand Down Expand Up @@ -129,6 +133,50 @@ The number of seconds to wait after failure before retrying.
A MongoDB URI to connect to.
See http://docs.mongodb.org/manual/reference/connection-string/.

[id="plugins-{type}s-{plugin}-action"]
===== `action`

* Value type is <<string,string>>
* Default value is `insert`.

The method used to write processed events to MongoDB.

Possible values are `insert`, `update` or `replace`.

[id="plugins-{type}s-{plugin}-query_key"]
===== `query_key`

* Value type is <<string,string>>
* Default value is `_id`.

The key of the query to find the document to update or replace in MongoDB.

query_key is used like described https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[here]
for `update` and `replace` examples:

:filter => {query_key => query_value}

[id="plugins-{type}s-{plugin}-query_value"]
===== `query_value`

* Value type is <<string,string>>
* There is no default value for this setting.

The value of the query to find the document to update or replace in MongoDB. This can be dynamic using the `%{foo}` syntax.

query_value is used like described https://docs.mongodb.com/ruby-driver/v2.6/tutorials/ruby-driver-bulk-operations[here]
for `update` and `replace` examples:

:filter => {query_key => query_value}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of this is not really clear from this example, would you mind improving it a bit?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was a week on vacation. Will look into this asap.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem take your time as you saw I went a bit beyond the original intent 😃

Hope you like the result

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets clear if you follow the link to mongodb documentation I added to ascii-documentation and take a look at update and replace examples.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in general you don't want to update a document in mongodb with whole logstash event but instead just increment some fields in mongodb? And do you want to read size to increment fields with from logstash event?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I am building projections from the events and I am doing now something like:

    if [@metadata][event_type] == "stock.updated" and [quantity] > 0 {
        mongodb {
            id => "orders.projection.mongodb.stock-update"
            uri => "${MONGO_DNS}"
            collection => "product-aggregates"
            database => "carts"
            isodate => true
            action => "update"
            filter => {
                "_id" => "[_id]"
                "store_id" => "[store_id]"
            }
            update_expressions => {
                "$inc" => {"stock" => "[stock_delta]"}
            }
        }
    }


[id="plugins-{type}s-{plugin}-upsert"]
===== `upsert`

* Value type is <<boolean,boolean>>
* Default value is `false`.

If true, a new document is created if no document exists in DB with given `document_id`.
Only applies if action is `update` or `replace`.


[id="plugins-{type}s-{plugin}-common-options"]
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/outputs/bson/big_decimal.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ module BigDecimal
# 1.221311.to_bson
# @return [ String ] The encoded string.
# @see http://bsonspec.org/#/specification
def to_bson(buffer = ByteBuffer.new)
def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?)
buffer.put_bytes([ self ].pack(PACK))
end

Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/outputs/bson/logstash_event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ module LogStashEvent
# Event.new("field" => "value").to_bson
# @return [ String ] The encoded string.
# @see http://bsonspec.org/#/specification
def to_bson(buffer = ByteBuffer.new)
def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?)
position = buffer.length
buffer.put_int32(0)
to_hash.each do |field, value|
Expand Down
2 changes: 1 addition & 1 deletion lib/logstash/outputs/bson/logstash_timestamp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ module LogStashTimestamp
# A time is type 0x09 in the BSON spec.
BSON_TYPE = 9.chr.force_encoding(BINARY).freeze

def to_bson(buffer = ByteBuffer.new)
def to_bson(buffer = ByteBuffer.new, validating_keys = Config.validating_keys?)
time.to_bson(buffer)
end

Expand Down
71 changes: 64 additions & 7 deletions lib/logstash/outputs/mongodb.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,23 @@ class LogStash::Outputs::Mongodb < LogStash::Outputs::Base
# whatever the bulk interval value (mongodb hard limit is 1000).
config :bulk_size, :validate => :number, :default => 900, :maximum => 999, :min => 2

# The method used to write processed events to MongoDB.
# Possible values are `insert`, `update` and `replace`.
config :action, :validate => :string, :required => true
# The key of the query to find the document to update or replace.
config :query_key, :validate => :string, :required => false, :default => "_id"
# The value of the query to find the document to update or replace. This can be dynamic using the `%{foo}` syntax.
config :query_value, :validate => :string, :required => false
# If true, a new document is created if no document exists in DB with given `document_id`.
# Only applies if action is `update` or `replace`.
config :upsert, :validate => :boolean, :required => false, :default => false

# Mutex used to synchronize access to 'documents'
@@mutex = Mutex.new

def register
if @bulk_size > 1000
raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'"
end

validate_config

Mongo::Logger.logger = @logger
conn = Mongo::Client.new(@uri)
Expand All @@ -65,7 +75,7 @@ def register
@@mutex.synchronize do
@documents.each do |collection, values|
if values.length > 0
@db[collection].insert_many(values)
write_to_mongodb(collection, values)
@documents.delete(collection)
end
end
Expand All @@ -74,6 +84,18 @@ def register
end
end

def validate_config
if @bulk_size > 1000
raise LogStash::ConfigurationError, "Bulk size must be lower than '1000', currently '#{@bulk_size}'"
end
if @action != "insert" && @action != "update" && @action != "replace"
raise LogStash::ConfigurationError, "Only insert, update and replace are valid for 'action' setting."
end
if (@action == "update" || @action == "replace") && (@query_value.nil? || @query_value.empty?)
raise LogStash::ConfigurationError, "If action is update or replace, query_value must be set."
end
end

def receive(event)
begin
# Our timestamp object now has a to_bson method, using it here
Expand All @@ -94,21 +116,24 @@ def receive(event)
document["_id"] = BSON::ObjectId.new
end

collection = event.sprintf(@collection)
if @action == "update" or @action == "replace"
document["metadata_mongodb_output_query_value"] = event.sprintf(@query_value)
end
if @bulk
collection = event.sprintf(@collection)
@@mutex.synchronize do
if(!@documents[collection])
@documents[collection] = []
end
@documents[collection].push(document)

if(@documents[collection].length >= @bulk_size)
@db[collection].insert_many(@documents[collection])
write_to_mongodb(collection, @documents[collection])
@documents.delete(collection)
end
end
else
@db[event.sprintf(@collection)].insert_one(document)
write_to_mongodb(collection, [document])
end
rescue => e
if e.message =~ /^E11000/
Expand All @@ -126,6 +151,38 @@ def receive(event)
end
end

def write_to_mongodb(collection, documents)
ops = get_write_ops(documents)
@db[collection].bulk_write(ops)
end

def get_write_ops(documents)
ops = []
documents.each do |doc|
replaced_query_value = doc["metadata_mongodb_output_query_value"]
doc.delete("metadata_mongodb_output_query_value")
if @action == "insert"
ops << {:insert_one => doc}
elsif @action == "update"
ops << {:update_one => {:filter => {@query_key => replaced_query_value}, :update => {'$set' => to_dotted_hash(doc)}, :upsert => @upsert}}
elsif @action == "replace"
ops << {:replace_one => {:filter => {@query_key => replaced_query_value}, :replacement => doc, :upsert => @upsert}}
end
end
ops
end

def to_dotted_hash(hash, recursive_key = "")
hash.each_with_object({}) do |(k, v), ret|
key = recursive_key + k.to_s
if v.is_a? Hash
ret.merge! to_dotted_hash(v, key + ".")
else
ret[key] = v
end
end
end

def close
@closed.make_true
@bulk_thread.wakeup
Expand Down
4 changes: 2 additions & 2 deletions logstash-output-mongodb.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-mongodb'
s.version = '3.1.6'
s.version = '3.2.0'
s.licenses = ['Apache License (2.0)']
s.summary = "Writes events to MongoDB"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -21,7 +21,7 @@ Gem::Specification.new do |s|
# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-codec-plain'
s.add_runtime_dependency 'mongo', '~> 2.6'
s.add_runtime_dependency 'mongo', '= 2.6'

s.add_development_dependency 'logstash-devutils'
end
Expand Down
3 changes: 2 additions & 1 deletion spec/integration/mongodb_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
let(:database) { 'logstash' }
let(:collection) { 'logs' }
let(:uuid) { SecureRandom.uuid }
let(:action) { 'insert' }

let(:config) do
{ "uri" => uri, "database" => database,
"collection" => collection, "isodate" => true }
"collection" => collection, "isodate" => true, "action" => action }
end

describe "#send" do
Expand Down
64 changes: 64 additions & 0 deletions spec/outputs/mongodb_config_validation_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# encoding: utf-8
require_relative "../spec_helper"
require "logstash/plugin"

describe LogStash::Outputs::Mongodb do

let(:uri) { 'mongodb://localhost:27017' }
let(:database) { 'logstash' }
let(:collection) { 'logs' }

describe "validate_config method" do

subject! { LogStash::Outputs::Mongodb.new(config) }

[
{:action => "not-supported", :query_key => "qk", :query_value => "qv", :upsert => false,
:expected_reason => "Only insert, update and replace are valid for 'action' setting."},
{:action => "update", :query_key => "qk", :query_value => nil, :upsert => false,
:expected_reason => "If action is update or replace, query_value must be set."},
{:action => "update", :query_key => "qk", :query_value => "", :upsert => false,
:expected_reason => "If action is update or replace, query_value must be set."},
{:action => "replace", :query_key => "qk", :query_value => nil, :upsert => false,
:expected_reason => "If action is update or replace, query_value must be set."},
{:action => "replace", :query_key => "qk", :query_value => "", :upsert => false,
:expected_reason => "If action is update or replace, query_value must be set."},
{:action => "insert", :bulk_size => 1001,
:expected_reason => "Bulk size must be lower than '1000', currently '1001'"},
].each do |test|

describe "when validating config with action '#{test[:action]}' query_key '#{test[:query_key]}', query_value '#{test[:query_value]}' and upsert '#{test[:upsert]}'" do

let(:config) {
configuration = {
"uri" => uri,
"database" => database,
"collection" => collection
}
unless test[:action].nil?
configuration["action"] = test[:action]
end
unless test[:query_key].nil?
configuration["query_key"] = test[:query_key]
end
unless test[:query_value].nil?
configuration["query_value"] = test[:query_value]
end
unless test[:upsert].nil?
configuration["upsert"] = test[:upsert]
end
unless test[:bulk_size].nil?
configuration["bulk_size"] = test[:bulk_size]
end
return configuration
}

it "should raise error: #{test[:expected_reason]}" do
expect { subject.validate_config }.to raise_error(LogStash::ConfigurationError, test[:expected_reason])
end
end

end

end
end
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
let(:uri) { 'mongodb://localhost:27017' }
let(:database) { 'logstash' }
let(:collection) { 'logs' }
let(:action) { 'insert' }

let(:config) {{
"uri" => uri,
"database" => database,
"collection" => collection
"collection" => collection,
"action" => action
}}

it "should register and close" do
Expand All @@ -20,7 +22,7 @@
plugin.close
end

describe "receive" do
describe "receive method while action is 'insert'" do
subject! { LogStash::Outputs::Mongodb.new(config) }

let(:event) { LogStash::Event.new(properties) }
Expand All @@ -32,15 +34,15 @@
allow(Mongo::Client).to receive(:new).and_return(connection)
allow(connection).to receive(:use).and_return(client)
allow(client).to receive(:[]).and_return(collection)
allow(collection).to receive(:insert_one)
allow(collection).to receive(:bulk_write)
subject.register
end

after(:each) do
subject.close
end

describe "#send" do
describe "when processing an event" do
let(:properties) {{
"message" => "This is a message!",
"uuid" => SecureRandom.uuid,
Expand All @@ -49,36 +51,41 @@
}}

it "should send the event to the database" do
expect(collection).to receive(:insert_one)
expect(collection).to receive(:bulk_write)
subject.receive(event)
end
end

describe "no event @timestamp" do
describe "when processing an event without @timestamp set" do
let(:properties) { { "message" => "foo" } }

it "should not contain a @timestamp field in the mongo document" do
it "should send a document without @timestamp field to mongodb" do
expect(event).to receive(:timestamp).and_return(nil)
expect(event).to receive(:to_hash).and_return(properties)
expect(collection).to receive(:insert_one).with(properties)
expect(collection).to receive(:bulk_write).with(
[ {:insert_one => properties} ]
)
subject.receive(event)
end
end

describe "generateId" do
describe "when generateId is set" do
let(:properties) { { "message" => "foo" } }
let(:config) {{
"uri" => uri,
"database" => database,
"collection" => collection,
"generateId" => true
"generateId" => true,
"action" => "insert"
}}

it "should contain a BSON::ObjectId as _id" do
it "should send a document containing a BSON::ObjectId as _id to mongodb" do
expect(BSON::ObjectId).to receive(:new).and_return("BSON::ObjectId")
expect(event).to receive(:timestamp).and_return(nil)
expect(event).to receive(:to_hash).and_return(properties)
expect(collection).to receive(:insert_one).with(properties.merge("_id" => "BSON::ObjectId"))
expect(collection).to receive(:bulk_write).with(
[ {:insert_one => properties.merge("_id" => "BSON::ObjectId")} ]
)
subject.receive(event)
end
end
Expand Down
Loading