diff --git a/README.md b/README.md index 018dff9..240e421 100644 --- a/README.md +++ b/README.md @@ -148,25 +148,27 @@ e = c.sign(e) # Set the open callback c.on :connect do |event| puts 'Publish event...' - c.publish(e) end # Set the response callback -c.on :message do |event| - puts "Response: #{event}" +c.on :ok do |event| + puts "Event id: #{event.id}" + puts "Accepted: #{event.success}" + puts "Message: #{event.message}" end # Connect and send the event c.connect c.publish(e) -c.stop +# Do more things +c.close # - - - - - - - - - - - - - - # Compact sync mode c.connect c.publish_and_wait(e) -c.stop +c.close ``` @@ -322,9 +324,24 @@ filter = Nostr::Filter.new( since: Time.now - (60*60*24), # 24 hours ago limit: 10 ) -c.on :message do |message| - puts ">> #{message}" + +c.on :connect do |message| + puts "Connected!" +end + +c.on :event do |message| + puts ">> #{message.content}" +end + +c.on :eose do |message| + puts "Finished subscription #{message.subscription_id}" + c.close +end + +c.on :close do |message| + puts "Connection closed" end + c.subscribe(filter: filter) c.connect ``` \ No newline at end of file diff --git a/lib/client.rb b/lib/client.rb index 500dedb..98df170 100644 --- a/lib/client.rb +++ b/lib/client.rb @@ -7,8 +7,10 @@ class Client include EventWizard attr_reader :signer + attr_reader :relay attr_reader :subscriptions + def initialize(signer: nil, private_key: nil, relay: nil, context: Context.new(timeout: 5)) initialize_event_emitter @@ -32,10 +34,22 @@ def initialize(signer: nil, private_key: nil, relay: nil, context: Context.new(t @inbound_channel = EventMachine::Channel.new @inbound_channel.subscribe do |msg| - emit :connect, msg[:relay] if msg[:type] == :open - emit :message, msg[:data] if msg[:type] == :message - emit :error, msg[:message] if msg[:type] == :error - emit :close, msg[:code], msg[:reason] if msg[:type] == :close + case msg[:type] + when :open + emit :connect, msg[:relay] + when :message + parsed_data = Nostr::MessageHandler.handle(msg[:data]) + emit :message, parsed_data + emit :event, parsed_data if parsed_data.type == "EVENT" + emit :ok, parsed_data if parsed_data.type == "OK" + emit :eose, parsed_data if parsed_data.type == "EOSE" + emit :closed, parsed_data if parsed_data.type == "CLOSED" + emit :notice, parsed_data if parsed_data.type == "NOTICE" + when :error + emit :error, msg[:message] + when :close + emit :close, msg[:code], msg[:reason] + end end end @@ -68,7 +82,6 @@ def generate_delegation_tag(to:, conditions:) end def connect(context: @context) - @running = true @thread = Thread.new do EM.run do @ws_client = Faye::WebSocket::Client.new(@relay) @@ -76,6 +89,7 @@ def connect(context: @context) @outbound_channel.subscribe { |msg| @ws_client.send(msg) && emit(:send, msg) } @ws_client.on :open do + @running = true @inbound_channel.push(type: :open, relay: @relay) end @@ -88,6 +102,7 @@ def connect(context: @context) end @ws_client.on :close do |event| + context.cancel @inbound_channel.push(type: :close, code: event.code, reason: event.reason) end @@ -128,23 +143,23 @@ def publish_and_wait(event, context: @context, close_on_finish: false) @outbound_channel.push(['EVENT', event.to_json].to_json) response_thread = Thread.new do - @response_mutex.synchronize do - @response_condition.wait(@response_mutex) # Wait for a response + context.wait do + @response_mutex.synchronize do + @response_condition.wait(@response_mutex) # Wait for a response + end end end @inbound_channel.subscribe do |message| - if message[:type] == :message && message[:data] - data = JSON.parse(message[:data]) - if data[1] == event.id - response = data - @response_condition.signal - end + parsed_data = Nostr::MessageHandler.handle(message[:data]) + if parsed_data.type == "OK" && parsed_data.event_id == event.id + response = parsed_data + @response_condition.signal end end response_thread.join - stop if close_on_finish + close if close_on_finish response end @@ -153,6 +168,7 @@ def subscribe(subscription_id: SecureRandom.hex, filter: Filter.new) @subscriptions[subscription_id] = filter @outbound_channel.push(["REQ", subscription_id, filter.to_h].to_json) @subscriptions[subscription_id] + subscription_id end def unsubscribe(subscription_id) diff --git a/lib/event.rb b/lib/event.rb index 0b5680c..4ab1b97 100644 --- a/lib/event.rb +++ b/lib/event.rb @@ -23,7 +23,8 @@ def initialize( sig: nil, pow: nil, delegation: nil, - recipient: nil + recipient: nil, + subscription_id: nil ) @pubkey = pubkey @created_at = created_at ? created_at : Time.now.utc.to_i @@ -55,6 +56,10 @@ def initialize( end end + def type + "EVENT" + end + def content=(content) return if @content == content @content = content @@ -172,6 +177,25 @@ def valid? true end + def self.from_message(message) + subscription_id = message[1] + event_data = message[2] + + event = new( + subscription_id: subscription_id, + kind: event_data["kind"], + pubkey: event_data["pubkey"], + created_at: event_data["created_at"], + tags: event_data["tags"], + content: event_data["content"], + id: event_data["id"], + sig: event_data["sig"], + pow: event_data["nonce"]&.last&.to_i + ) + raise ArgumentError, "Event is not valid" unless event.valid? + return event + end + private def reset! diff --git a/lib/message_handler.rb b/lib/message_handler.rb new file mode 100644 index 0000000..6510591 --- /dev/null +++ b/lib/message_handler.rb @@ -0,0 +1,107 @@ +module Nostr + class MessageHandler + def self.handle(message) + + message = JSON.parse(message) rescue ["?", message] + type = message[0] + strategy_class = case type + when 'EVENT' then EventMessageStrategy + when 'OK' then OkMessageStrategy + when 'EOSE' then EoseMessageStrategy + when 'CLOSED' then ClosedMessageStrategy + when 'NOTICE' then NoticeMessageStrategy + else UnknownMessageStrategy + end + + processed_data = strategy_class.new(message).process + type == "EVENT" ? processed_data : ParsedData.new(processed_data) + + end + end + + class BaseMessageStrategy + def initialize(message) + @message = message + end + + def process + raise NotImplementedError + end + end + + class EventMessageStrategy < BaseMessageStrategy + def process + Event.from_message(@message) + end + end + + class OkMessageStrategy < BaseMessageStrategy + def process + { + type: 'OK', + event_id: @message[1], + success: @message[2], + message: @message[3] + } + end + end + + class EoseMessageStrategy < BaseMessageStrategy + def process + { + type: 'EOSE', + subscription_id: @message[1] + } + end + end + + class ClosedMessageStrategy < BaseMessageStrategy + def process + { + type: 'CLOSED', + subscription_id: @message[1], + reason: @message[2] + } + end + end + + class NoticeMessageStrategy < BaseMessageStrategy + def process + { + type: 'NOTICE', + message: @message[1] + } + end + end + + class UnknownMessageStrategy < BaseMessageStrategy + def process + { + type: 'UNKNOWN', + raw_message: @message + } + end + end +end + +class ParsedData + def initialize(data) + @data = data + end + + def type + @data[:type] + end + + def method_missing(method_name, *args, &block) + if @data.key?(method_name) + @data[method_name] + else + super + end + end + + def respond_to_missing?(method_name, include_private = false) + @data.key?(method_name) || super + end +end \ No newline at end of file diff --git a/lib/nostr_ruby.rb b/lib/nostr_ruby.rb index d2b9518..21421b6 100644 --- a/lib/nostr_ruby.rb +++ b/lib/nostr_ruby.rb @@ -17,6 +17,7 @@ require_relative 'filter' require_relative 'signer' require_relative 'client' +require_relative 'message_handler' module Nostr diff --git a/lib/signer.rb b/lib/signer.rb index 5b2b157..9cb0617 100644 --- a/lib/signer.rb +++ b/lib/signer.rb @@ -28,7 +28,6 @@ def sign(event) raise ArgumentError, "Pubkey doesn't match the private key" unless event.pubkey == @public_key if event.kind == Nostr::Kind::DIRECT_MESSAGE - puts "event.inspect => #{event.inspect}" event.content = CryptoTools.aes_256_cbc_encrypt(@private_key, event.recipient, event.content) end