Class: Deepstream::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/deepstream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(address, port = 6021, credentials = {}) ⇒ Client

Returns a new instance of Client.



108
109
110
111
# File 'lib/deepstream.rb', line 108

def initialize(address, port = 6021, credentials = {})
  @address, @port, @unread_msg, @event_callbacks, @records, @max_timeout, @timeout = address, port, nil, {}, {}, 60, 1
  connect(credentials)
end

Instance Attribute Details

#connectedObject (readonly)

Returns the value of attribute connected.



114
115
116
# File 'lib/deepstream.rb', line 114

def connected
  @connected
end

#max_timeoutObject

Returns the value of attribute max_timeout.



113
114
115
# File 'lib/deepstream.rb', line 113

def max_timeout
  @max_timeout
end

#verboseObject

Returns the value of attribute verbose.



113
114
115
# File 'lib/deepstream.rb', line 113

def verbose
  @verbose
end

Instance Method Details

#_fire_event_callback(msg) ⇒ Object



276
277
278
# File 'lib/deepstream.rb', line 276

def _fire_event_callback(msg)
  @event_callbacks[msg[2]].tap { |cb| Thread.start { cb.(_parse_data(msg[3])) } if cb }
end

#_login(credentials) ⇒ Object



116
117
118
119
# File 'lib/deepstream.rb', line 116

def (credentials)
  _write("A", "REQ", credentials.to_json)
  raise unless _read_socket == %w{A A}
end

#_parse_data(payload) ⇒ Object



291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/deepstream.rb', line 291

def _parse_data(payload)
  case payload[0]
  when 'O' then JSON.parse(payload[1..-1], object_class: OpenStruct)
  when '{' then JSON.parse(payload, object_class: OpenStruct)
  when 'S' then payload[1..-1]
  when 'N' then payload[1..-1].to_f
  when 'T' then true
  when 'F' then false
  when 'L' then nil
  else JSON.parse(payload, object_class: OpenStruct)
  end
end

#_process_msg(msg) ⇒ Object



258
259
260
261
262
263
264
265
266
267
268
269
270
# File 'lib/deepstream.rb', line 258

def _process_msg(msg)
  case msg[0..1]
  when %w{E EVT} then _fire_event_callback(msg)
  when %w{R P} then @records[msg[2]]._patch(msg[3], msg[4], _parse_data(msg[5]))
  when %w{R U} then @records[msg[2]]._update(msg[3], _parse_data(msg[4]))
  when %w{R A} then @records.delete(msg[3]) if msg[2] == 'D'
  when %w{E A} then nil
  when %w{X E} then nil
  when [] then nil
  else
    @unread_msg = msg
  end
end

#_readObject



272
273
274
# File 'lib/deepstream.rb', line 272

def _read
  loop { break @unread_msg || (next sleep 0.01) }.tap { @unread_msg = nil }
end

#_read_socket(timeout: nil) ⇒ Object



121
122
123
124
125
# File 'lib/deepstream.rb', line 121

def _read_socket(timeout: nil)
  Timeout.timeout(timeout) do
    @socket.gets(30.chr).tap { |m| break m.chomp(30.chr).split(31.chr) if m }
  end
end

#_resubscribe_eventsObject



231
232
233
234
235
# File 'lib/deepstream.rb', line 231

def _resubscribe_events
  @event_callbacks.keys.each do |event|
    _write_and_read('E', 'S', event)
  end
end

#_sync_recordsObject



237
238
239
240
241
242
243
# File 'lib/deepstream.rb', line 237

def _sync_records
  @records.each do |name, record|
    _write_and_read('R', 'CR', name)
    msg = _read
    @records[name]._update(msg[3].to_i, _parse_data(msg[4]))
  end
end

#_typed(value) ⇒ Object



280
281
282
283
284
285
286
287
288
289
# File 'lib/deepstream.rb', line 280

def _typed(value)
  case value
  when Hash then "O#{value.to_json}"
  when String then "S#{value}"
  when Numeric then "N#{value}"
  when TrueClass then 'T'
  when FalseClass then 'F'
  when NilClass then 'L'
  end
end

#_write(*args) ⇒ Object



251
252
253
254
255
256
# File 'lib/deepstream.rb', line 251

def _write(*args)
  @socket.write(args.join(31.chr) + 30.chr)
rescue => e
  raise "not connected" unless @connected
  raise e
end

#_write_and_read(*args) {|_read| ... } ⇒ Object

Yields:



245
246
247
248
249
# File 'lib/deepstream.rb', line 245

def _write_and_read(*args)
  @unread_msg = nil
  _write(*args)
  yield _read if block_given?
end

#connect(credentials) ⇒ Object



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/deepstream.rb', line 127

def connect(credentials)
  return self if @connected
  Thread.start do
    Thread.current[:name] = "reader#{object_id}"
    loop do
      break if @connected # ensures only one thread remains after reconnection
      begin
        Timeout.timeout(2) { @socket = TCPSocket.new(@address, @port) }
        (credentials)
        @connected = true
        print Time.now.to_s[/.+ .+ /], "Connected\n" if @verbose
        Thread.start do
          _sync_records
          _resubscribe_events
        end
        loop do
          @timeout = 1
          begin
            _process_msg(_read_socket(timeout: 10))
          rescue Timeout::Error
            _write("heartbeat") # send anything to check if deepstream responds
            _process_msg(_read_socket(timeout: 10))
          end
        end
      rescue => e
        @connected = false
        @socket.close rescue nil
        print Time.now.to_s[/.+ .+ /], "Can't connect to deepstream server\n" if @verbose
        print "Error: ", e.message, "\n" if @verbose
        sleep @timeout
        @timeout = [@timeout * 1.2, @max_timeout].min
      end
    end
  end
  sleep 0.5
  self
end

#delete(record_name) ⇒ Object



220
221
222
223
224
225
226
227
228
229
# File 'lib/deepstream.rb', line 220

def delete(record_name)
  if matching = record_name.match(/(?<namespace>\w+)\/(?<record>.+)/)
    tmp = get_list(matching[:namespace])
    tmp.remove(record_name)
  end
  _write('R', 'D', record_name)
rescue => e
  print "Error: ", e.message, "\n" if @verbose
  false
end

#disconnectObject



165
166
167
168
169
170
# File 'lib/deepstream.rb', line 165

def disconnect
  @connected = false
  @socket.close rescue nil
  Thread.list.find { |x| x[:name] == "reader#{object_id}" }.kill
  self
end

#emit(event, value = nil, opts = { timeout: nil }) ⇒ Object



172
173
174
175
176
177
178
# File 'lib/deepstream.rb', line 172

def emit(event, value = nil, opts = { timeout: nil })
  result = nil
  Timeout::timeout(opts[:timeout]) do
    sleep 1 until (result = _write('E', 'EVT', event, _typed(value)) rescue false) || opts[:timeout].nil?
  end
  result
end

#get(record_name) ⇒ Object



188
189
190
# File 'lib/deepstream.rb', line 188

def get(record_name)
  get_record(record_name)
end

#get_list(list_name) ⇒ Object



209
210
211
212
213
214
215
216
217
218
# File 'lib/deepstream.rb', line 209

def get_list(list_name)
  @records[list_name] ||= (
    _write_and_read('R', 'CR', list_name)
    msg = _read
    Deepstream::List.new(self, list_name, _parse_data(msg[4]), msg[3].to_i)
  )
rescue => e
  print "Error: ", e.message, "\n" if @verbose
  @records[list_name] = Deepstream::List.new(self, list_name, [], 0)
end

#get_record(record_name, list: nil) ⇒ Object



192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/deepstream.rb', line 192

def get_record(record_name, list: nil)
  name = list ? "#{list}/#{record_name}" : record_name
  if list
    @records[list] ||= get_list(list)
    @records[list].add(name)
  end
  @records[name] ||= (
    _write_and_read('R', 'CR', name)
    msg = _read
    Deepstream::Record.new(self, name, _parse_data(msg[4]), msg[3].to_i)
  )
  @records[name]
rescue => e
  print "Error: ", e.message, "\n" if @verbose
  @records[name] = Deepstream::Record.new(self, name, OpenStruct.new, 0)
end

#inspectObject



304
305
306
# File 'lib/deepstream.rb', line 304

def inspect
  "Deepstream::Client #{@address}:#{@port} connected: #{@connected}"
end

#on(event, &block) ⇒ Object



180
181
182
183
184
185
186
# File 'lib/deepstream.rb', line 180

def on(event, &block)
  _write_and_read('E', 'S', event)
  @event_callbacks[event] = block
rescue => e
  print "Error: ", e.message, "\n" if @verbose
  @event_callbacks[event] = block
end