diff --git a/NEWS.md b/NEWS.md index 607174e..2a79419 100644 --- a/NEWS.md +++ b/NEWS.md @@ -5,6 +5,7 @@ Features: * For EXTERNAL authentication, try also without the user id, to work with containers ([#126][]). + * Thread safety, as long as the non-main threads only send signals. [#126]: https://github.com/mvidner/ruby-dbus/issues/126 diff --git a/lib/dbus/message_queue.rb b/lib/dbus/message_queue.rb index 494dbed..824b625 100644 --- a/lib/dbus/message_queue.rb +++ b/lib/dbus/message_queue.rb @@ -23,6 +23,7 @@ def initialize(address) @address = address @buffer = "" @is_tcp = false + @mutex = Mutex.new connect end @@ -33,23 +34,28 @@ def initialize(address) # @raise EOFError # @todo failure modes def pop(blocking: true) - buffer_from_socket_nonblock - message = message_from_buffer_nonblock - if blocking - # we can block - while message.nil? - r, _d, _d = IO.select([@socket]) - if r && r[0] == @socket - buffer_from_socket_nonblock - message = message_from_buffer_nonblock + # FIXME: this is not enough, the R/W test deadlocks on shared connections + @mutex.synchronize do + buffer_from_socket_nonblock + message = message_from_buffer_nonblock + if blocking + # we can block + while message.nil? + r, _d, _d = IO.select([@socket]) + if r && r[0] == @socket + buffer_from_socket_nonblock + message = message_from_buffer_nonblock + end end end + message end - message end def push(message) - @socket.write(message.marshall) + @mutex.synchronize do + @socket.write(message.marshall) + end end alias << push diff --git a/spec/thread_safety_spec.rb b/spec/thread_safety_spec.rb index 8238bb9..9fd1b9e 100755 --- a/spec/thread_safety_spec.rb +++ b/spec/thread_safety_spec.rb @@ -5,28 +5,71 @@ require_relative "spec_helper" require "dbus" -describe "ThreadSafetyTest" do - it "tests thread competition" do - print "Thread competition: " - jobs = [] - 5.times do - jobs << Thread.new do - Thread.current.abort_on_exception = true +class TestSignalRace < DBus::Object + dbus_interface "org.ruby.ServerTest" do + dbus_signal :signal_without_arguments + end +end + +# Run *count* threads all doing *body*, wait for their finish +def race_threads(count, &body) + jobs = count.times.map do |j| + Thread.new do + Thread.current.abort_on_exception = true + + body.call(j) + end + end + jobs.each(&:join) +end + +# Repeat *count* times: { random sleep, *body* }, printing progress +def repeat_with_jitter(count, &body) + count.times do |i| + sleep 0.1 * rand + print "#{i} " + $stdout.flush + body.call + end +end + +describe "thread safety" do + context "R/W: when the threads call methods with return values" do + it "it works with separate bus connections" do + race_threads(5) do |_j| # use separate connections to avoid races bus = DBus::ASessionBus.new svc = bus.service("org.ruby.service") obj = svc.object("/org/ruby/MyInstance") obj.default_iface = "org.ruby.SampleInterface" - 10.times do |i| - print "#{i} " - $stdout.flush + repeat_with_jitter(10) do expect(obj.the_answer[0]).to eq(42) - sleep 0.1 * rand end end + puts + end + end + + context "W/O: when the threads only send signals" do + it "it works with a shared separate bus connection" do + race_threads(5) do |j| + # shared connection + bus = DBus::SessionBus.instance + # hackish: we do not actually request the name + svc = DBus::Service.new("org.ruby.server-test#{j}", bus) + + obj = TestSignalRace.new "/org/ruby/Foo" + svc.export obj + + repeat_with_jitter(10) do + obj.signal_without_arguments + end + + svc.unexport(obj) + end + puts end - jobs.each(&:join) end end