Class: Waithook::WebsocketClient
- Inherits:
-
Object
- Object
- Waithook::WebsocketClient
- Defined in:
- lib/waithook/websocket_client.rb
Overview
Simple websocket client, internally use websocket gem to construct and parse websocket messages
Usage:
client = WebsocketClient.new(host: HOST, port: PORT, path: 'test-ruby')
client.send_ping!
client.("Hello, server")
type, data = client. # => type == :text, data is message content as a string
Defined Under Namespace
Classes: Waiter
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
- #_handshake_recieved! ⇒ Object
- #_notify_waiters(type, payload) ⇒ Object
- #_process_frame(message) ⇒ Object
- #_send_frame(type, payload = nil) ⇒ Object
- #_start_parser! ⇒ Object
- #_wait_frames! ⇒ Object
- #_wait_handshake_response ⇒ Object
-
#close!(send_close: true, kill_processing_thread: true) ⇒ Object
Send
:close
message to socket and close socket connection. -
#connect! ⇒ Object
Establish connection to server and send initial handshake.
-
#connected? ⇒ Boolean
Return true/false.
-
#initialize(options = {}) ⇒ WebsocketClient
constructor
Available options: * :host - hostname * :port - server port (default 80) * :path - HTTP path * :ssl - true/false (will detect base on port if blank) * :ssl_version * :verify_mode * :logger_level - logger level, default :info * :output - output stream for default logger.
- #ping_sender(interval = 60) ⇒ Object
-
#send_message!(payload) ⇒ Object
Send message to server (type
:text
). -
#send_ping! ⇒ Object
Send
:ping
message to server. -
#send_pong! ⇒ Object
Send
:pong
message to server, usually as a response to :ping message. - #socket_open? ⇒ Boolean
-
#wait_handshake! ⇒ Object
(also: #wait_connected)
Wait until server handshake recieved.
-
#wait_message ⇒ Object
Synchronously waiting for new message.
-
#wait_new_message ⇒ Object
Wait for new message (thread safe, all waiting threads will receive a message) Message format [type, data], e.g.
Constructor Details
#initialize(options = {}) ⇒ WebsocketClient
Available options:
-
:host - hostname
-
:port - server port (default 80)
-
:path - HTTP path
-
:ssl - true/false (will detect base on port if blank)
-
:ssl_version
-
:verify_mode
-
:logger_level - logger level, default :info
-
:output - output stream for default logger. If value == false then it will be silent, default is $stdout
-
:logger - custom logger object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/waithook/websocket_client.rb', line 50 def initialize( = {}) # required: :host, :path @host = [:host] @port = [:port] || 80 @path = [:path] @use_ssl = .has_key?(:ssl) ? [:ssl] : @port == 443 @options = @waiters = [] @connect_waiters = [] @handshake_received = false @messages = Queue.new @is_open = false @output = [:output] || $stdout if [:logger] === false @output = StringIO.new end if [:logger] && [:logger] != true @logger = [:logger] else @logger = LoggerWithTrace.new(@output).setup( progname: self.class.name, level: [:logger_level] || :info ) end end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
19 20 21 |
# File 'lib/waithook/websocket_client.rb', line 19 def logger @logger end |
Instance Method Details
#_handshake_recieved! ⇒ Object
198 199 200 201 202 203 |
# File 'lib/waithook/websocket_client.rb', line 198 def _handshake_recieved! @handshake_received = true while waiter = @connect_waiters.shift waiter.notify(true) end end |
#_notify_waiters(type, payload) ⇒ Object
205 206 207 208 209 |
# File 'lib/waithook/websocket_client.rb', line 205 def _notify_waiters(type, payload) while waiter = @waiters.shift waiter.notify([type, payload]) end end |
#_process_frame(message) ⇒ Object
218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# File 'lib/waithook/websocket_client.rb', line 218 def _process_frame() logger.trace "Received :#{.type} #{.data ? "DATA: #{.data}" : "(no data)"}" if .type == :pong logger.debug "Got pong" end if .type == :ping send_pong! end if .type == :text @messages.push([.type, .data]) _notify_waiters(.type, .data) end end |
#_send_frame(type, payload = nil) ⇒ Object
211 212 213 214 215 216 |
# File 'lib/waithook/websocket_client.rb', line 211 def _send_frame(type, payload = nil) wait_handshake! frame = WebSocket::Frame::Outgoing::Client.new(version: @handshake.version, data: payload, type: type) logger.trace "Sending :#{frame.type} #{payload ? "DATA: #{frame.data}" : "(no data)"}" @socket.write(frame.to_s) end |
#_start_parser! ⇒ Object
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/waithook/websocket_client.rb', line 118 def _start_parser! @reader, @writter = IO.pipe @processing_thread = Thread.new do # Thread.current.abort_on_exception = true begin logger.debug "Start reading in thread" handshake_response = _wait_handshake_response @handshake << handshake_response logger.trace "Handshake received:\n #{handshake_response}" @frame_parser = WebSocket::Frame::Incoming::Client.new _handshake_recieved! _wait_frames! rescue Object => error # if got error - close socket and end thread # @last_connection_error will be detected in #wait_message and raised to caller thread, # so caller can decide to reconnect or do something else @last_connection_error = error close!(send_close: false, kill_processing_thread: false) rescue nil @messages.close logger.error "#{error.class}: #{error.}\n#{error.backtrace.join("\n")}" end end end |
#_wait_frames! ⇒ Object
241 242 243 244 245 246 247 248 |
# File 'lib/waithook/websocket_client.rb', line 241 def _wait_frames! while char = @socket.getc @frame_parser << char while = @frame_parser.next _process_frame() end end end |
#_wait_handshake_response ⇒ Object
250 251 252 253 254 255 256 257 258 259 260 |
# File 'lib/waithook/websocket_client.rb', line 250 def _wait_handshake_response logger.debug "Waiting handshake response" data = [] while line = @socket.gets data << line if line == "\r\n" break end end data.join("") end |
#close!(send_close: true, kill_processing_thread: true) ⇒ Object
Send :close
message to socket and close socket connection
267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/waithook/websocket_client.rb', line 267 def close!(send_close: true, kill_processing_thread: true) unless @is_open logger.info "Already closed" return false end logger.info "Disconnecting from #{@host} #{@port}" @processing_thread.kill if kill_processing_thread _send_frame(:close) if send_close @socket.close @is_open = false return true end |
#connect! ⇒ Object
Establish connection to server and send initial handshake
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/waithook/websocket_client.rb', line 83 def connect! logger.info "Connecting to #{@host} #{@port}" tcp_socket = TCPSocket.open(@host, @port) if @use_ssl require 'openssl' ctx = OpenSSL::SSL::SSLContext.new ctx.ssl_version = @options[:ssl_version] if @options[:ssl_version] ctx.verify_mode = @options[:verify_mode] if @options[:verify_mode] cert_store = OpenSSL::X509::Store.new cert_store.set_default_paths ctx.cert_store = cert_store @socket = ::OpenSSL::SSL::SSLSocket.new(tcp_socket, ctx) @socket.connect else @socket = tcp_socket end @is_open = true @handshake = WebSocket::Handshake::Client.new(url: "ws://#{@host}/#{@path.to_s.sub(/^\//, '')}") logger.trace "Sending handshake:\n#{@handshake}" @socket.print(@handshake) _start_parser! self end |
#connected? ⇒ Boolean
Return true/false
114 115 116 |
# File 'lib/waithook/websocket_client.rb', line 114 def connected? !!@is_open end |
#ping_sender(interval = 60) ⇒ Object
234 235 236 237 238 239 |
# File 'lib/waithook/websocket_client.rb', line 234 def ping_sender(interval = 60) while @is_open send_ping! sleep interval end end |
#send_message!(payload) ⇒ Object
Send message to server (type :text
)
155 156 157 |
# File 'lib/waithook/websocket_client.rb', line 155 def (payload) _send_frame(:text, payload) end |
#send_ping! ⇒ Object
Send :ping
message to server
145 146 147 |
# File 'lib/waithook/websocket_client.rb', line 145 def send_ping! _send_frame(:ping) end |
#send_pong! ⇒ Object
Send :pong
message to server, usually as a response to :ping message
150 151 152 |
# File 'lib/waithook/websocket_client.rb', line 150 def send_pong! _send_frame(:pong) end |
#socket_open? ⇒ Boolean
262 263 264 |
# File 'lib/waithook/websocket_client.rb', line 262 def socket_open? @socket && !@socket.closed? end |
#wait_handshake! ⇒ Object Also known as: wait_connected
Wait until server handshake recieved. Once it’s received - we can are listening for new messages and connection is ready for sending data
189 190 191 192 193 194 195 |
# File 'lib/waithook/websocket_client.rb', line 189 def wait_handshake! return true if @handshake_received waiter = Waiter.new @connect_waiters << waiter waiter.wait self end |
#wait_message ⇒ Object
Synchronously waiting for new message. Not thread safe, only one thread will receive message Message format [type, data], e.g. [:text, "hello world"]
169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/waithook/websocket_client.rb', line 169 def begin = @messages.pop if (! || .first == nil) && @messages.closed? && @last_connection_error raise @last_connection_error end return rescue => error if @last_connection_error raise @last_connection_error else raise error end end end |