Class: WSDirector::Client
- Inherits:
-
Object
- Object
- WSDirector::Client
- Includes:
- CGI::Escape
- Defined in:
- lib/wsdirector/client.rb
Overview
WebSocket client
Constant Summary collapse
- WAIT_WHEN_EXPECTING_EVENT =
5
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#ws ⇒ Object
readonly
Returns the value of attribute ws.
Instance Method Summary collapse
-
#consumed(id) ⇒ Object
Push message back to the mailbox (when it doesn’t match the expectation).
- #each_message ⇒ Object
- #ignored?(msg) ⇒ Boolean
-
#initialize(url:, ignore: nil, intercept: nil, subprotocol: nil, headers: nil, id: nil, query: nil, cookies: nil) ⇒ Client
constructor
Create new WebSocket client and connect to WSDirector ws URL.
- #intercepted?(msg) ⇒ Boolean
- #receive(timeout = WAIT_WHEN_EXPECTING_EVENT) ⇒ Object
- #send(msg) ⇒ Object
Constructor Details
#initialize(url:, ignore: nil, intercept: nil, subprotocol: nil, headers: nil, id: nil, query: nil, cookies: nil) ⇒ Client
Create new WebSocket client and connect to WSDirector ws URL.
Optionally provide an ignore pattern (to ignore incoming message, for example, pings)
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/wsdirector/client.rb', line 20 def initialize(url:, ignore: nil, intercept: nil, subprotocol: nil, headers: nil, id: nil, query: nil, cookies: nil) @ignore = ignore @interceptor = intercept @mailbox = [] = @has_messages = Concurrent::Semaphore.new(0) = @messages = Queue.new open = Concurrent::Promise.new client = self = {} if subprotocol headers ||= {} headers["Sec-WebSocket-Protocol"] = subprotocol end if headers ||= {} headers["Cookie"] = .map { "#{_1}=#{escape(_2.to_s)}" }.join("; ") end if query url = "#{url}?#{query.map { "#{_1}=#{escape(_2.to_s)}" }.join("&")}" end [:headers] = headers if headers @id = id || SecureRandom.hex(6) @ws = WebSocket::Client::Simple.connect(url, ) do |ws| ws.on(:open) do |_event| open.set(true) end ws.on :message do |msg| data = msg.data next if data.empty? next if client.ignored?(data) next if client.intercepted?(data) << data .release end ws.on :error do |e| << Error.new("WebSocket Error #{e.inspect} #{e.backtrace}") end end open.wait!(WAIT_WHEN_EXPECTING_EVENT) rescue Errno::ECONNREFUSED raise Error, "Failed to connect to #{url}" end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
13 14 15 |
# File 'lib/wsdirector/client.rb', line 13 def id @id end |
#ws ⇒ Object (readonly)
Returns the value of attribute ws.
13 14 15 |
# File 'lib/wsdirector/client.rb', line 13 def ws @ws end |
Instance Method Details
#consumed(id) ⇒ Object
Push message back to the mailbox (when it doesn’t match the expectation)
93 94 95 |
# File 'lib/wsdirector/client.rb', line 93 def consumed(id) @mailbox.delete_at(id) end |
#each_message ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/wsdirector/client.rb', line 73 def @mailbox.dup.each.with_index do |msg, i| yield msg, i end loop do msg = receive @mailbox << msg yield msg, (@mailbox.size - 1) end end |
#ignored?(msg) ⇒ Boolean
101 102 103 104 |
# File 'lib/wsdirector/client.rb', line 101 def ignored?(msg) return false unless @ignore @ignore.any? { |pattern| msg =~ Regexp.new(pattern) } end |
#intercepted?(msg) ⇒ Boolean
106 107 108 109 110 |
# File 'lib/wsdirector/client.rb', line 106 def intercepted?(msg) return false unless @interceptor instance_exec(msg, &@interceptor) end |
#receive(timeout = WAIT_WHEN_EXPECTING_EVENT) ⇒ Object
85 86 87 88 89 90 |
# File 'lib/wsdirector/client.rb', line 85 def receive(timeout = WAIT_WHEN_EXPECTING_EVENT) @has_messages.try_acquire(1, timeout) msg = @messages.pop(true) raise msg if msg.is_a?(Exception) msg end |
#send(msg) ⇒ Object
97 98 99 |
# File 'lib/wsdirector/client.rb', line 97 def send(msg) @ws.send(msg) end |