Class: Deepstream::Client
- Inherits:
-
Object
- Object
- Deepstream::Client
- Defined in:
- lib/deepstream.rb
Instance Method Summary collapse
- #_connect ⇒ Object
- #_fire_event_callback(msg) ⇒ Object
- #_open_socket ⇒ Object
- #_parse_data(payload) ⇒ Object
- #_process_msg(msg) ⇒ Object
- #_read ⇒ Object
- #_typed(value) ⇒ Object
- #_write(*args) ⇒ Object
- #_write_and_read(*args) {|_read| ... } ⇒ Object
- #delete(record_name) ⇒ Object
- #emit(event, value = nil) ⇒ Object
- #get(record_name) ⇒ Object
- #get_list(list_name) ⇒ Object
- #get_record(record_name, list: nil) ⇒ Object
-
#initialize(address, port = 6021) ⇒ Client
constructor
A new instance of Client.
- #on(event, &block) ⇒ Object
Constructor Details
#initialize(address, port = 6021) ⇒ Client
Returns a new instance of Client.
87 88 89 |
# File 'lib/deepstream.rb', line 87 def initialize(address, port = 6021) @address, @port, @unread_msg, @event_callbacks, @records = address, port, nil, {}, {} end |
Instance Method Details
#_connect ⇒ Object
144 145 146 147 148 |
# File 'lib/deepstream.rb', line 144 def _connect _open_socket @connected = true @connected = _write_and_read(%w{A REQ {}}) { |msg| msg == %w{A A} } end |
#_fire_event_callback(msg) ⇒ Object
178 179 180 |
# File 'lib/deepstream.rb', line 178 def _fire_event_callback(msg) @event_callbacks[msg[2]].tap { |cb| Thread.start { cb.(_parse_data(msg[3])) } if cb } end |
#_open_socket ⇒ Object
134 135 136 137 138 139 140 141 142 |
# File 'lib/deepstream.rb', line 134 def _open_socket Timeout.timeout(2) { @socket = TCPSocket.new(@address, @port) } Thread.start do loop { _process_msg(@socket.gets(30.chr).tap { |m| break m.chomp(30.chr).split(31.chr) if m }) } end rescue print Time.now.to_s[/.+ .+ /], "Can't connect to deepstream server\n" raise end |
#_parse_data(payload) ⇒ Object
193 194 195 196 197 198 199 200 201 202 203 204 |
# File 'lib/deepstream.rb', line 193 def _parse_data(payload) case payload[0] when 'O' then JSON.parse(payload[1..-1], object_class: OpenStruct) when '{' then JSON.parse(payload, object_class: OpenStruct) when 'S' then payload[1..-1] when 'N' then payload[1..-1].to_f when 'T' then true when 'F' then false when 'L' then nil else JSON.parse(payload, object_class: OpenStruct) end end |
#_process_msg(msg) ⇒ Object
163 164 165 166 167 168 169 170 171 172 |
# File 'lib/deepstream.rb', line 163 def _process_msg(msg) case msg[0..1] when %w{E EVT} then _fire_event_callback(msg) when %w{R P} then @records[msg[2]]._patch(msg[3], msg[4], _parse_data(msg[5])) when %w{R U} then @records[msg[2]]._update(msg[3], _parse_data(msg[4])) when %w{R A} then @records.delete(msg[3]) if msg[2] == 'D' when %w{E A} then nil else @unread_msg = msg end end |
#_read ⇒ Object
174 175 176 |
# File 'lib/deepstream.rb', line 174 def _read loop { break @unread_msg || (next sleep 0.01) }.tap { @unread_msg = nil } end |
#_typed(value) ⇒ Object
182 183 184 185 186 187 188 189 190 191 |
# File 'lib/deepstream.rb', line 182 def _typed(value) case value when Hash then "O#{value.to_json}" when String then "S#{value}" when Numeric then "N#{value}" when TrueClass then 'T' when FalseClass then 'F' when NilClass then 'L' end end |
#_write(*args) ⇒ Object
156 157 158 159 160 161 |
# File 'lib/deepstream.rb', line 156 def _write(*args) _connect unless @connected @socket.write(args.join(31.chr) + 30.chr) rescue @connected = false end |
#_write_and_read(*args) {|_read| ... } ⇒ Object
150 151 152 153 154 |
# File 'lib/deepstream.rb', line 150 def _write_and_read(*args) @unread_msg = nil _write(*args) yield _read if block_given? end |
#delete(record_name) ⇒ Object
126 127 128 129 130 131 132 |
# File 'lib/deepstream.rb', line 126 def delete(record_name) if matching = record_name.match(/(?<namespace>\w+)\/(?<record>.+)/) tmp = get_list(matching[:namespace]) tmp.remove(record_name) end _write('R', 'D', record_name) end |
#emit(event, value = nil) ⇒ Object
91 92 93 |
# File 'lib/deepstream.rb', line 91 def emit(event, value = nil) _write('E', 'EVT', event, _typed(value)) end |
#get(record_name) ⇒ Object
100 101 102 |
# File 'lib/deepstream.rb', line 100 def get(record_name) get_record(record_name) end |
#get_list(list_name) ⇒ Object
118 119 120 121 122 123 124 |
# File 'lib/deepstream.rb', line 118 def get_list(list_name) @records[list_name] ||= ( _write_and_read('R', 'CR', list_name) msg = _read Deepstream::List.new(self, list_name, _parse_data(msg[4]), msg[3].to_i) ) end |
#get_record(record_name, list: nil) ⇒ Object
104 105 106 107 108 109 110 111 112 113 114 115 116 |
# File 'lib/deepstream.rb', line 104 def get_record(record_name, list: nil) name = list ? "#{list}/#{record_name}" : record_name @records[name] ||= ( _write_and_read('R', 'CR', name) msg = _read Deepstream::Record.new(self, name, _parse_data(msg[4]), msg[3].to_i) ) if list @records[list] ||= get_list(list) @records[list].add(name) end @records[name] end |
#on(event, &block) ⇒ Object
95 96 97 98 |
# File 'lib/deepstream.rb', line 95 def on(event, &block) _write_and_read('E', 'S', event) @event_callbacks[event] = block end |