Class: Deepstream::Client
- Inherits:
-
Object
- Object
- Deepstream::Client
- Defined in:
- lib/deepstream.rb
Instance Attribute Summary collapse
-
#connected ⇒ Object
readonly
Returns the value of attribute connected.
-
#max_timeout ⇒ Object
Returns the value of attribute max_timeout.
-
#verbose ⇒ Object
Returns the value of attribute verbose.
Instance Method Summary collapse
- #_fire_event_callback(msg) ⇒ Object
- #_login(credentials) ⇒ Object
- #_parse_data(payload) ⇒ Object
- #_process_msg(msg) ⇒ Object
- #_read ⇒ Object
- #_read_socket(timeout: nil) ⇒ Object
- #_resubscribe_events ⇒ Object
- #_sync_records ⇒ Object
- #_typed(value) ⇒ Object
- #_write(*args) ⇒ Object
- #_write_and_read(*args) {|_read| ... } ⇒ Object
- #connect(credentials) ⇒ Object
- #delete(record_name) ⇒ Object
- #disconnect ⇒ Object
- #emit(event, value = nil, opts = { timeout: nil }) ⇒ Object
- #get(record_name) ⇒ Object
- #get_list(list_name) ⇒ Object
- #get_record(record_name, list: nil) ⇒ Object
-
#initialize(address, port = 6021, credentials = {}) ⇒ Client
constructor
A new instance of Client.
- #inspect ⇒ Object
- #on(event, &block) ⇒ Object
Constructor Details
#initialize(address, port = 6021, credentials = {}) ⇒ Client
Returns a new instance of Client.
108 109 110 111 |
# File 'lib/deepstream.rb', line 108 def initialize(address, port = 6021, credentials = {}) @address, @port, @unread_msg, @event_callbacks, @records, @max_timeout, @timeout = address, port, nil, {}, {}, 60, 1 connect(credentials) end |
Instance Attribute Details
#connected ⇒ Object (readonly)
Returns the value of attribute connected.
114 115 116 |
# File 'lib/deepstream.rb', line 114 def connected @connected end |
#max_timeout ⇒ Object
Returns the value of attribute max_timeout.
113 114 115 |
# File 'lib/deepstream.rb', line 113 def max_timeout @max_timeout end |
#verbose ⇒ Object
Returns the value of attribute verbose.
113 114 115 |
# File 'lib/deepstream.rb', line 113 def verbose @verbose end |
Instance Method Details
#_fire_event_callback(msg) ⇒ Object
276 277 278 |
# File 'lib/deepstream.rb', line 276 def _fire_event_callback(msg) @event_callbacks[msg[2]].tap { |cb| Thread.start { cb.(_parse_data(msg[3])) } if cb } end |
#_login(credentials) ⇒ Object
116 117 118 119 |
# File 'lib/deepstream.rb', line 116 def _login(credentials) _write("A", "REQ", credentials.to_json) raise unless _read_socket == %w{A A} end |
#_parse_data(payload) ⇒ Object
291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/deepstream.rb', line 291 def _parse_data(payload) case payload[0] when 'O' then JSON.parse(payload[1..-1], object_class: OpenStruct) when '{' then JSON.parse(payload, object_class: OpenStruct) when 'S' then payload[1..-1] when 'N' then payload[1..-1].to_f when 'T' then true when 'F' then false when 'L' then nil else JSON.parse(payload, object_class: OpenStruct) end end |
#_process_msg(msg) ⇒ Object
258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/deepstream.rb', line 258 def _process_msg(msg) case msg[0..1] when %w{E EVT} then _fire_event_callback(msg) when %w{R P} then @records[msg[2]]._patch(msg[3], msg[4], _parse_data(msg[5])) when %w{R U} then @records[msg[2]]._update(msg[3], _parse_data(msg[4])) when %w{R A} then @records.delete(msg[3]) if msg[2] == 'D' when %w{E A} then nil when %w{X E} then nil when [] then nil else @unread_msg = msg end end |
#_read ⇒ Object
272 273 274 |
# File 'lib/deepstream.rb', line 272 def _read loop { break @unread_msg || (next sleep 0.01) }.tap { @unread_msg = nil } end |
#_read_socket(timeout: nil) ⇒ Object
121 122 123 124 125 |
# File 'lib/deepstream.rb', line 121 def _read_socket(timeout: nil) Timeout.timeout(timeout) do @socket.gets(30.chr).tap { |m| break m.chomp(30.chr).split(31.chr) if m } end end |
#_resubscribe_events ⇒ Object
231 232 233 234 235 |
# File 'lib/deepstream.rb', line 231 def _resubscribe_events @event_callbacks.keys.each do |event| _write_and_read('E', 'S', event) end end |
#_sync_records ⇒ Object
237 238 239 240 241 242 243 |
# File 'lib/deepstream.rb', line 237 def _sync_records @records.each do |name, record| _write_and_read('R', 'CR', name) msg = _read @records[name]._update(msg[3].to_i, _parse_data(msg[4])) end end |
#_typed(value) ⇒ Object
280 281 282 283 284 285 286 287 288 289 |
# File 'lib/deepstream.rb', line 280 def _typed(value) case value when Hash then "O#{value.to_json}" when String then "S#{value}" when Numeric then "N#{value}" when TrueClass then 'T' when FalseClass then 'F' when NilClass then 'L' end end |
#_write(*args) ⇒ Object
251 252 253 254 255 256 |
# File 'lib/deepstream.rb', line 251 def _write(*args) @socket.write(args.join(31.chr) + 30.chr) rescue => e raise "not connected" unless @connected raise e end |
#_write_and_read(*args) {|_read| ... } ⇒ Object
245 246 247 248 249 |
# File 'lib/deepstream.rb', line 245 def _write_and_read(*args) @unread_msg = nil _write(*args) yield _read if block_given? end |
#connect(credentials) ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/deepstream.rb', line 127 def connect(credentials) return self if @connected Thread.start do Thread.current[:name] = "reader#{object_id}" loop do break if @connected # ensures only one thread remains after reconnection begin Timeout.timeout(2) { @socket = TCPSocket.new(@address, @port) } _login(credentials) @connected = true print Time.now.to_s[/.+ .+ /], "Connected\n" if @verbose Thread.start do _sync_records _resubscribe_events end loop do @timeout = 1 begin _process_msg(_read_socket(timeout: 10)) rescue Timeout::Error _write("heartbeat") # send anything to check if deepstream responds _process_msg(_read_socket(timeout: 10)) end end rescue => e @connected = false @socket.close rescue nil print Time.now.to_s[/.+ .+ /], "Can't connect to deepstream server\n" if @verbose print "Error: ", e., "\n" if @verbose sleep @timeout @timeout = [@timeout * 1.2, @max_timeout].min end end end sleep 0.5 self end |
#delete(record_name) ⇒ Object
220 221 222 223 224 225 226 227 228 229 |
# File 'lib/deepstream.rb', line 220 def delete(record_name) if matching = record_name.match(/(?<namespace>\w+)\/(?<record>.+)/) tmp = get_list(matching[:namespace]) tmp.remove(record_name) end _write('R', 'D', record_name) rescue => e print "Error: ", e., "\n" if @verbose false end |
#disconnect ⇒ Object
165 166 167 168 169 170 |
# File 'lib/deepstream.rb', line 165 def disconnect @connected = false @socket.close rescue nil Thread.list.find { |x| x[:name] == "reader#{object_id}" }.kill self end |
#emit(event, value = nil, opts = { timeout: nil }) ⇒ Object
172 173 174 175 176 177 178 |
# File 'lib/deepstream.rb', line 172 def emit(event, value = nil, opts = { timeout: nil }) result = nil Timeout::timeout(opts[:timeout]) do sleep 1 until (result = _write('E', 'EVT', event, _typed(value)) rescue false) || opts[:timeout].nil? end result end |
#get(record_name) ⇒ Object
188 189 190 |
# File 'lib/deepstream.rb', line 188 def get(record_name) get_record(record_name) end |
#get_list(list_name) ⇒ Object
209 210 211 212 213 214 215 216 217 218 |
# File 'lib/deepstream.rb', line 209 def get_list(list_name) @records[list_name] ||= ( _write_and_read('R', 'CR', list_name) msg = _read Deepstream::List.new(self, list_name, _parse_data(msg[4]), msg[3].to_i) ) rescue => e print "Error: ", e., "\n" if @verbose @records[list_name] = Deepstream::List.new(self, list_name, [], 0) end |
#get_record(record_name, list: nil) ⇒ Object
192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 |
# File 'lib/deepstream.rb', line 192 def get_record(record_name, list: nil) name = list ? "#{list}/#{record_name}" : record_name if list @records[list] ||= get_list(list) @records[list].add(name) end @records[name] ||= ( _write_and_read('R', 'CR', name) msg = _read Deepstream::Record.new(self, name, _parse_data(msg[4]), msg[3].to_i) ) @records[name] rescue => e print "Error: ", e., "\n" if @verbose @records[name] = Deepstream::Record.new(self, name, OpenStruct.new, 0) end |
#inspect ⇒ Object
304 305 306 |
# File 'lib/deepstream.rb', line 304 def inspect "Deepstream::Client #{@address}:#{@port} connected: #{@connected}" end |
#on(event, &block) ⇒ Object
180 181 182 183 184 185 186 |
# File 'lib/deepstream.rb', line 180 def on(event, &block) _write_and_read('E', 'S', event) @event_callbacks[event] = block rescue => e print "Error: ", e., "\n" if @verbose @event_callbacks[event] = block end |