Class: WSDirector::Client

Inherits:
Object
  • Object
show all
Includes:
CGI::Escape
Defined in:
lib/wsdirector/client.rb

Overview

WebSocket client

Constant Summary collapse

WAIT_WHEN_EXPECTING_EVENT =
5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url:, ignore: nil, intercept: nil, subprotocol: nil, headers: nil, id: nil, query: nil, cookies: nil) ⇒ Client

Create new WebSocket client and connect to WSDirector ws URL.

Optionally provide an ignore pattern (to ignore incoming message, for example, pings)



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/wsdirector/client.rb', line 20

def initialize(url:, ignore: nil, intercept: nil, subprotocol: nil, headers: nil, id: nil, query: nil, cookies: nil)
  @ignore = ignore
  @interceptor = intercept
  @mailbox = []

  has_messages = @has_messages = Concurrent::Semaphore.new(0)
  messages = @messages = Queue.new
  open = Concurrent::Promise.new
  client = self

  options = {}

  if subprotocol
    headers ||= {}
    headers["Sec-WebSocket-Protocol"] = subprotocol
  end

  if cookies
    headers ||= {}
    headers["Cookie"] = cookies.map { "#{_1}=#{escape(_2.to_s)}" }.join("; ")
  end

  if query
    url = "#{url}?#{query.map { "#{_1}=#{escape(_2.to_s)}" }.join("&")}"
  end

  options[:headers] = headers if headers

  @id = id || SecureRandom.hex(6)
  @ws = WebSocket::Client::Simple.connect(url, options) do |ws|
    ws.on(:open) do |_event|
      open.set(true)
    end

    ws.on :message do |msg|
      data = msg.data
      next if data.empty?
      next if client.ignored?(data)
      next if client.intercepted?(data)
      messages << data
      has_messages.release
    end

    ws.on :error do |e|
      messages << Error.new("WebSocket Error #{e.inspect} #{e.backtrace}")
    end
  end

  open.wait!(WAIT_WHEN_EXPECTING_EVENT)
rescue Errno::ECONNREFUSED
  raise Error, "Failed to connect to #{url}"
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



13
14
15
# File 'lib/wsdirector/client.rb', line 13

def id
  @id
end

#wsObject (readonly)

Returns the value of attribute ws.



13
14
15
# File 'lib/wsdirector/client.rb', line 13

def ws
  @ws
end

Instance Method Details

#consumed(id) ⇒ Object

Push message back to the mailbox (when it doesn’t match the expectation)



93
94
95
# File 'lib/wsdirector/client.rb', line 93

def consumed(id)
  @mailbox.delete_at(id)
end

#each_messageObject



73
74
75
76
77
78
79
80
81
82
83
# File 'lib/wsdirector/client.rb', line 73

def each_message
  @mailbox.dup.each.with_index do |msg, i|
    yield msg, i
  end

  loop do
    msg = receive
    @mailbox << msg
    yield msg, (@mailbox.size - 1)
  end
end

#ignored?(msg) ⇒ Boolean

Returns:

  • (Boolean)


101
102
103
104
# File 'lib/wsdirector/client.rb', line 101

def ignored?(msg)
  return false unless @ignore
  @ignore.any? { |pattern| msg =~ Regexp.new(pattern) }
end

#intercepted?(msg) ⇒ Boolean

Returns:

  • (Boolean)


106
107
108
109
110
# File 'lib/wsdirector/client.rb', line 106

def intercepted?(msg)
  return false unless @interceptor

  instance_exec(msg, &@interceptor)
end

#receive(timeout = WAIT_WHEN_EXPECTING_EVENT) ⇒ Object



85
86
87
88
89
90
# File 'lib/wsdirector/client.rb', line 85

def receive(timeout = WAIT_WHEN_EXPECTING_EVENT)
  @has_messages.try_acquire(1, timeout)
  msg = @messages.pop(true)
  raise msg if msg.is_a?(Exception)
  msg
end

#send(msg) ⇒ Object



97
98
99
# File 'lib/wsdirector/client.rb', line 97

def send(msg)
  @ws.send(msg)
end