Skip to content

Commit

Permalink
Improve handling of response messages from relays
Browse files Browse the repository at this point in the history
  • Loading branch information
dtonon committed Dec 10, 2024
1 parent ca8833d commit 880cda5
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 23 deletions.
31 changes: 24 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,25 +148,27 @@ e = c.sign(e)
# Set the open callback
c.on :connect do |event|
puts 'Publish event...'
c.publish(e)
end

# Set the response callback
c.on :message do |event|
puts "Response: #{event}"
c.on :ok do |event|
puts "Event id: #{event.id}"
puts "Accepted: #{event.success}"
puts "Message: #{event.message}"
end

# Connect and send the event
c.connect
c.publish(e)
c.stop
# Do more things
c.close

# - - - - - - - - - - - - - -
# Compact sync mode

c.connect
c.publish_and_wait(e)
c.stop
c.close

```

Expand Down Expand Up @@ -322,9 +324,24 @@ filter = Nostr::Filter.new(
since: Time.now - (60*60*24), # 24 hours ago
limit: 10
)
c.on :message do |message|
puts ">> #{message}"

c.on :connect do |message|
puts "Connected!"
end

c.on :event do |message|
puts ">> #{message.content}"
end

c.on :eose do |message|
puts "Finished subscription #{message.subscription_id}"
c.close
end

c.on :close do |message|
puts "Connection closed"
end

c.subscribe(filter: filter)
c.connect
```
44 changes: 30 additions & 14 deletions lib/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ class Client
include EventWizard

attr_reader :signer
attr_reader :relay
attr_reader :subscriptions


def initialize(signer: nil, private_key: nil, relay: nil, context: Context.new(timeout: 5))
initialize_event_emitter

Expand All @@ -32,10 +34,22 @@ def initialize(signer: nil, private_key: nil, relay: nil, context: Context.new(t
@inbound_channel = EventMachine::Channel.new

@inbound_channel.subscribe do |msg|
emit :connect, msg[:relay] if msg[:type] == :open
emit :message, msg[:data] if msg[:type] == :message
emit :error, msg[:message] if msg[:type] == :error
emit :close, msg[:code], msg[:reason] if msg[:type] == :close
case msg[:type]
when :open
emit :connect, msg[:relay]
when :message
parsed_data = Nostr::MessageHandler.handle(msg[:data])
emit :message, parsed_data
emit :event, parsed_data if parsed_data.type == "EVENT"
emit :ok, parsed_data if parsed_data.type == "OK"
emit :eose, parsed_data if parsed_data.type == "EOSE"
emit :closed, parsed_data if parsed_data.type == "CLOSED"
emit :notice, parsed_data if parsed_data.type == "NOTICE"
when :error
emit :error, msg[:message]
when :close
emit :close, msg[:code], msg[:reason]
end
end
end

Expand Down Expand Up @@ -68,14 +82,14 @@ def generate_delegation_tag(to:, conditions:)
end

def connect(context: @context)
@running = true
@thread = Thread.new do
EM.run do
@ws_client = Faye::WebSocket::Client.new(@relay)

@outbound_channel.subscribe { |msg| @ws_client.send(msg) && emit(:send, msg) }

@ws_client.on :open do
@running = true
@inbound_channel.push(type: :open, relay: @relay)
end

Expand All @@ -88,6 +102,7 @@ def connect(context: @context)
end

@ws_client.on :close do |event|
context.cancel
@inbound_channel.push(type: :close, code: event.code, reason: event.reason)
end

Expand Down Expand Up @@ -128,23 +143,23 @@ def publish_and_wait(event, context: @context, close_on_finish: false)
@outbound_channel.push(['EVENT', event.to_json].to_json)

response_thread = Thread.new do
@response_mutex.synchronize do
@response_condition.wait(@response_mutex) # Wait for a response
context.wait do
@response_mutex.synchronize do
@response_condition.wait(@response_mutex) # Wait for a response
end
end
end

@inbound_channel.subscribe do |message|
if message[:type] == :message && message[:data]
data = JSON.parse(message[:data])
if data[1] == event.id
response = data
@response_condition.signal
end
parsed_data = Nostr::MessageHandler.handle(message[:data])
if parsed_data.type == "OK" && parsed_data.event_id == event.id
response = parsed_data
@response_condition.signal
end
end

response_thread.join
stop if close_on_finish
close if close_on_finish

response
end
Expand All @@ -153,6 +168,7 @@ def subscribe(subscription_id: SecureRandom.hex, filter: Filter.new)
@subscriptions[subscription_id] = filter
@outbound_channel.push(["REQ", subscription_id, filter.to_h].to_json)
@subscriptions[subscription_id]
subscription_id
end

def unsubscribe(subscription_id)
Expand Down
26 changes: 25 additions & 1 deletion lib/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ def initialize(
sig: nil,
pow: nil,
delegation: nil,
recipient: nil
recipient: nil,
subscription_id: nil
)
@pubkey = pubkey
@created_at = created_at ? created_at : Time.now.utc.to_i
Expand Down Expand Up @@ -55,6 +56,10 @@ def initialize(
end
end

def type
"EVENT"
end

def content=(content)
return if @content == content
@content = content
Expand Down Expand Up @@ -172,6 +177,25 @@ def valid?
true
end

def self.from_message(message)
subscription_id = message[1]
event_data = message[2]

event = new(
subscription_id: subscription_id,
kind: event_data["kind"],
pubkey: event_data["pubkey"],
created_at: event_data["created_at"],
tags: event_data["tags"],
content: event_data["content"],
id: event_data["id"],
sig: event_data["sig"],
pow: event_data["nonce"]&.last&.to_i
)
raise ArgumentError, "Event is not valid" unless event.valid?
return event
end

private

def reset!
Expand Down
107 changes: 107 additions & 0 deletions lib/message_handler.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
module Nostr
class MessageHandler
def self.handle(message)

message = JSON.parse(message) rescue ["?", message]
type = message[0]
strategy_class = case type
when 'EVENT' then EventMessageStrategy
when 'OK' then OkMessageStrategy
when 'EOSE' then EoseMessageStrategy
when 'CLOSED' then ClosedMessageStrategy
when 'NOTICE' then NoticeMessageStrategy
else UnknownMessageStrategy
end

processed_data = strategy_class.new(message).process
type == "EVENT" ? processed_data : ParsedData.new(processed_data)

end
end

class BaseMessageStrategy
def initialize(message)
@message = message
end

def process
raise NotImplementedError
end
end

class EventMessageStrategy < BaseMessageStrategy
def process
Event.from_message(@message)
end
end

class OkMessageStrategy < BaseMessageStrategy
def process
{
type: 'OK',
event_id: @message[1],
success: @message[2],
message: @message[3]
}
end
end

class EoseMessageStrategy < BaseMessageStrategy
def process
{
type: 'EOSE',
subscription_id: @message[1]
}
end
end

class ClosedMessageStrategy < BaseMessageStrategy
def process
{
type: 'CLOSED',
subscription_id: @message[1],
reason: @message[2]
}
end
end

class NoticeMessageStrategy < BaseMessageStrategy
def process
{
type: 'NOTICE',
message: @message[1]
}
end
end

class UnknownMessageStrategy < BaseMessageStrategy
def process
{
type: 'UNKNOWN',
raw_message: @message
}
end
end
end

class ParsedData
def initialize(data)
@data = data
end

def type
@data[:type]
end

def method_missing(method_name, *args, &block)
if @data.key?(method_name)
@data[method_name]
else
super
end
end

def respond_to_missing?(method_name, include_private = false)
@data.key?(method_name) || super
end
end
1 change: 1 addition & 0 deletions lib/nostr_ruby.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
require_relative 'filter'
require_relative 'signer'
require_relative 'client'
require_relative 'message_handler'

module Nostr

Expand Down
1 change: 0 additions & 1 deletion lib/signer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ def sign(event)
raise ArgumentError, "Pubkey doesn't match the private key" unless event.pubkey == @public_key

if event.kind == Nostr::Kind::DIRECT_MESSAGE
puts "event.inspect => #{event.inspect}"
event.content = CryptoTools.aes_256_cbc_encrypt(@private_key, event.recipient, event.content)
end

Expand Down

0 comments on commit 880cda5

Please sign in to comment.