Skip to content

Commit

Permalink
Signal thread safety: Add a mutex to MessageQueue#push,pop
Browse files Browse the repository at this point in the history
The previous attempt at thread safety failed (#7, #73)
but now I see a limited use case:

A worker thread only uses the shared bus connection to send signals
about its progress and completion. No method calls are accepted there.

That is, we only race for the bus connection in MessageQueue#push, not
in the #pop reading part.
  • Loading branch information
mvidner committed Mar 21, 2023
1 parent 942e0a7 commit c33e881
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 23 deletions.
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Features:
* For EXTERNAL authentication, try also without the user id, to work with
containers ([#126][]).
* Thread safety, as long as the non-main threads only send signals.

[#126]: https://github.com/mvidner/ruby-dbus/issues/126

Expand Down
28 changes: 17 additions & 11 deletions lib/dbus/message_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def initialize(address)
@address = address
@buffer = ""
@is_tcp = false
@mutex = Mutex.new
connect
end

Expand All @@ -33,23 +34,28 @@ def initialize(address)
# @raise EOFError
# @todo failure modes
def pop(blocking: true)
buffer_from_socket_nonblock
message = message_from_buffer_nonblock
if blocking
# we can block
while message.nil?
r, _d, _d = IO.select([@socket])
if r && r[0] == @socket
buffer_from_socket_nonblock
message = message_from_buffer_nonblock
# FIXME: this is not enough, the R/W test deadlocks on shared connections
@mutex.synchronize do
buffer_from_socket_nonblock
message = message_from_buffer_nonblock
if blocking
# we can block
while message.nil?
r, _d, _d = IO.select([@socket])
if r && r[0] == @socket
buffer_from_socket_nonblock
message = message_from_buffer_nonblock
end
end
end
message
end
message
end

def push(message)
@socket.write(message.marshall)
@mutex.synchronize do
@socket.write(message.marshall)
end
end
alias << push

Expand Down
67 changes: 55 additions & 12 deletions spec/thread_safety_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,71 @@
require_relative "spec_helper"
require "dbus"

describe "ThreadSafetyTest" do
it "tests thread competition" do
print "Thread competition: "
jobs = []
5.times do
jobs << Thread.new do
Thread.current.abort_on_exception = true
class TestSignalRace < DBus::Object
dbus_interface "org.ruby.ServerTest" do
dbus_signal :signal_without_arguments
end
end

# Run *count* threads all doing *body*, wait for their finish
def race_threads(count, &body)
jobs = count.times.map do |j|
Thread.new do
Thread.current.abort_on_exception = true

body.call(j)
end
end
jobs.each(&:join)
end

# Repeat *count* times: { random sleep, *body* }, printing progress
def repeat_with_jitter(count, &body)
count.times do |i|
sleep 0.1 * rand
print "#{i} "
$stdout.flush

body.call
end
end

describe "thread safety" do
context "R/W: when the threads call methods with return values" do
it "it works with separate bus connections" do
race_threads(5) do |_j|
# use separate connections to avoid races
bus = DBus::ASessionBus.new
svc = bus.service("org.ruby.service")
obj = svc.object("/org/ruby/MyInstance")
obj.default_iface = "org.ruby.SampleInterface"

10.times do |i|
print "#{i} "
$stdout.flush
repeat_with_jitter(10) do
expect(obj.the_answer[0]).to eq(42)
sleep 0.1 * rand
end
end
puts
end
end

context "W/O: when the threads only send signals" do
it "it works with a shared separate bus connection" do
race_threads(5) do |j|
# shared connection
bus = DBus::SessionBus.instance
# hackish: we do not actually request the name
svc = DBus::Service.new("org.ruby.server-test#{j}", bus)

obj = TestSignalRace.new "/org/ruby/Foo"
svc.export obj

repeat_with_jitter(10) do
obj.signal_without_arguments
end

svc.unexport(obj)
end
puts
end
jobs.each(&:join)
end
end

0 comments on commit c33e881

Please sign in to comment.