Class: Waithook::WebsocketClient

Inherits:
Object
  • Object
show all
Defined in:
lib/waithook/websocket_client.rb

Overview

Simple websocket client, internally use websocket gem to construct and parse websocket messages

Usage:

client = WebsocketClient.new(host: HOST, port: PORT, path: 'test-ruby')
client.send_ping!
client.send_message!("Hello, server")
type, data = client.wait_message # => type == :text, data is message content as a string

Defined Under Namespace

Classes: Waiter

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ WebsocketClient

Available options:

  • :host - hostname

  • :port - server port (default 80)

  • :path - HTTP path

  • :ssl - true/false (will detect base on port if blank)

  • :ssl_version

  • :verify_mode

  • :logger_level - logger level, default :info

  • :output - output stream for default logger. If value == false then it will be silent, default is $stdout

  • :logger - custom logger object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/waithook/websocket_client.rb', line 50

def initialize(options = {})
  # required: :host, :path

  @host = options[:host]
  @port = options[:port] || 80
  @path = options[:path]

  @use_ssl = options.has_key?(:ssl) ? options[:ssl] : @port == 443
  @options = options

  @waiters = []
  @connect_waiters = []
  @handshake_received = false
  @messages = Queue.new
  @is_open = false

  @output = options[:output] || $stdout

  if options[:logger] === false
    @output = StringIO.new
  end

  if options[:logger] && options[:logger] != true
    @logger = options[:logger]
  else
    @logger = LoggerWithTrace.new(@output).setup(
      progname: self.class.name,
      level: options[:logger_level] || :info
    )
  end
end

Instance Attribute Details

#loggerObject

Returns the value of attribute logger.



19
20
21
# File 'lib/waithook/websocket_client.rb', line 19

def logger
  @logger
end

Instance Method Details

#_handshake_recieved!Object



198
199
200
201
202
203
# File 'lib/waithook/websocket_client.rb', line 198

def _handshake_recieved!
  @handshake_received = true
  while waiter = @connect_waiters.shift
    waiter.notify(true)
  end
end

#_notify_waiters(type, payload) ⇒ Object



205
206
207
208
209
# File 'lib/waithook/websocket_client.rb', line 205

def _notify_waiters(type, payload)
  while waiter = @waiters.shift
    waiter.notify([type, payload])
  end
end

#_process_frame(message) ⇒ Object



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
# File 'lib/waithook/websocket_client.rb', line 218

def _process_frame(message)
  logger.trace "Received :#{message.type} #{message.data ? "DATA: #{message.data}" : "(no data)"}"

  if message.type == :pong
    logger.debug "Got pong"
  end

  if message.type == :ping
    send_pong!
  end
  if message.type == :text
    @messages.push([message.type, message.data])
    _notify_waiters(message.type, message.data)
  end
end

#_send_frame(type, payload = nil) ⇒ Object



211
212
213
214
215
216
# File 'lib/waithook/websocket_client.rb', line 211

def _send_frame(type, payload = nil)
  wait_handshake!
  frame = WebSocket::Frame::Outgoing::Client.new(version: @handshake.version, data: payload, type: type)
  logger.trace "Sending :#{frame.type} #{payload ? "DATA: #{frame.data}" : "(no data)"}"
  @socket.write(frame.to_s)
end

#_start_parser!Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/waithook/websocket_client.rb', line 118

def _start_parser!
  @reader, @writter = IO.pipe
  @processing_thread = Thread.new do
    # Thread.current.abort_on_exception = true
    begin
      logger.debug "Start reading in thread"
      handshake_response = _wait_handshake_response
      @handshake << handshake_response
      logger.trace "Handshake received:\n #{handshake_response}"

      @frame_parser = WebSocket::Frame::Incoming::Client.new
      _handshake_recieved!
      _wait_frames!
    rescue Object => error
      # if got error - close socket and end thread
      # @last_connection_error will be detected in #wait_message and raised to caller thread,
      # so caller can decide to reconnect or do something else
      @last_connection_error = error
      close!(send_close: false, kill_processing_thread: false) rescue nil
      @messages.close

      logger.error "#{error.class}: #{error.message}\n#{error.backtrace.join("\n")}"
    end
  end
end

#_wait_frames!Object



241
242
243
244
245
246
247
248
# File 'lib/waithook/websocket_client.rb', line 241

def _wait_frames!
  while char = @socket.getc
    @frame_parser << char
    while message = @frame_parser.next
      _process_frame(message)
    end
  end
end

#_wait_handshake_responseObject



250
251
252
253
254
255
256
257
258
259
260
# File 'lib/waithook/websocket_client.rb', line 250

def _wait_handshake_response
  logger.debug "Waiting handshake response"
  data = []
  while line = @socket.gets
    data << line
    if line == "\r\n"
      break
    end
  end
  data.join("")
end

#close!(send_close: true, kill_processing_thread: true) ⇒ Object

Send :close message to socket and close socket connection



267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/waithook/websocket_client.rb', line 267

def close!(send_close: true, kill_processing_thread: true)
  unless @is_open
    logger.info "Already closed"
    return false
  end

  logger.info "Disconnecting from #{@host} #{@port}"
  @processing_thread.kill if kill_processing_thread
  _send_frame(:close) if send_close
  @socket.close
  @is_open = false

  return true
end

#connect!Object

Establish connection to server and send initial handshake



83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/waithook/websocket_client.rb', line 83

def connect!
  logger.info "Connecting to #{@host} #{@port}"

  tcp_socket = TCPSocket.open(@host, @port)

  if @use_ssl
    require 'openssl'
    ctx = OpenSSL::SSL::SSLContext.new
    ctx.ssl_version = @options[:ssl_version] if @options[:ssl_version]
    ctx.verify_mode = @options[:verify_mode] if @options[:verify_mode]
    cert_store = OpenSSL::X509::Store.new
    cert_store.set_default_paths
    ctx.cert_store = cert_store
    @socket = ::OpenSSL::SSL::SSLSocket.new(tcp_socket, ctx)
    @socket.connect
  else
    @socket = tcp_socket
  end

  @is_open = true
  @handshake = WebSocket::Handshake::Client.new(url: "ws://#{@host}/#{@path.to_s.sub(/^\//, '')}")

  logger.trace "Sending handshake:\n#{@handshake}"

  @socket.print(@handshake)
  _start_parser!

  self
end

#connected?Boolean

Return true/false

Returns:

  • (Boolean)


114
115
116
# File 'lib/waithook/websocket_client.rb', line 114

def connected?
  !!@is_open
end

#ping_sender(interval = 60) ⇒ Object



234
235
236
237
238
239
# File 'lib/waithook/websocket_client.rb', line 234

def ping_sender(interval = 60)
  while @is_open
    send_ping!
    sleep interval
  end
end

#send_message!(payload) ⇒ Object

Send message to server (type :text)



155
156
157
# File 'lib/waithook/websocket_client.rb', line 155

def send_message!(payload)
  _send_frame(:text, payload)
end

#send_ping!Object

Send :ping message to server



145
146
147
# File 'lib/waithook/websocket_client.rb', line 145

def send_ping!
  _send_frame(:ping)
end

#send_pong!Object

Send :pong message to server, usually as a response to :ping message



150
151
152
# File 'lib/waithook/websocket_client.rb', line 150

def send_pong!
  _send_frame(:pong)
end

#socket_open?Boolean

Returns:

  • (Boolean)


262
263
264
# File 'lib/waithook/websocket_client.rb', line 262

def socket_open?
  @socket && !@socket.closed?
end

#wait_handshake!Object Also known as: wait_connected

Wait until server handshake recieved. Once it’s received - we can are listening for new messages and connection is ready for sending data



189
190
191
192
193
194
195
# File 'lib/waithook/websocket_client.rb', line 189

def wait_handshake!
  return true if @handshake_received
  waiter = Waiter.new
  @connect_waiters << waiter
  waiter.wait
  self
end

#wait_messageObject

Synchronously waiting for new message. Not thread safe, only one thread will receive message Message format [type, data], e.g. [:text, "hello world"]



169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/waithook/websocket_client.rb', line 169

def wait_message
  begin
    message = @messages.pop
    if (!message || message.first == nil) && @messages.closed? && @last_connection_error
      raise @last_connection_error
    end

    return message
  rescue => error
    if @last_connection_error
      raise @last_connection_error
    else
      raise error
    end
  end
end

#wait_new_messageObject

Wait for new message (thread safe, all waiting threads will receive a message) Message format [type, data], e.g. [:text, "hello world"]



161
162
163
164
165
# File 'lib/waithook/websocket_client.rb', line 161

def wait_new_message
  waiter = Waiter.new
  @waiters << waiter
  waiter.wait
end