Class: DripDrop::ZMQXReqHandler

Inherits:
ZMQBaseHandler show all
Includes:
ZMQReadableHandler, ZMQWritableHandler
Defined in:
lib/dripdrop/handlers/zeromq.rb

Instance Attribute Summary

Attributes included from ZMQReadableHandler

#message_class

Attributes inherited from ZMQBaseHandler

#connection

Instance Method Summary collapse

Methods included from ZMQReadableHandler

#decode_message

Methods included from ZMQWritableHandler

#on_writable

Methods inherited from ZMQBaseHandler

#add_connection, #address, #on_receive, #on_recv, #post_setup, #read_connection, #write_connection

Methods inherited from BaseHandler

#handle_error, #on_error, #print_exception

Constructor Details

#initialize(*args) ⇒ ZMQXReqHandler

Returns a new instance of ZMQXReqHandler.



263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/dripdrop/handlers/zeromq.rb', line 263

def initialize(*args)
  super(*args)
  #Used to keep track of responses
  @seq_counter = 0
  @promises = {}

  # should never be handled by the user
  self.on_receive do |message|
    begin
      seq = message.head[SEQ_CTR_KEY]
      raise "Missing Seq Counter" unless seq
      promise = @promises.delete(seq)
      promise.call(message) if promise
    rescue StandardError => e
      handle_error(e)
    end
  end
end

Instance Method Details

#on_readable(socket, messages) ⇒ Object



297
298
299
300
301
302
303
304
# File 'lib/dripdrop/handlers/zeromq.rb', line 297

def on_readable(socket, messages)
  begin
    # Strip out empty delimiter
    super(socket, messages[1..-1])
  rescue StandardError => e
    handle_error(e)
  end
end

#send_message(message, &block) ⇒ Object



282
283
284
285
286
287
288
289
290
291
292
293
294
295
# File 'lib/dripdrop/handlers/zeromq.rb', line 282

def send_message(message,&block)
  begin
    dd_message = dd_messagify(message,@message_class)
    if dd_message.is_a?(DripDrop::Message)
      @seq_counter += 1
      dd_message.head[SEQ_CTR_KEY] = @seq_counter
      @promises[@seq_counter] = block if block
      message = dd_message
    end
  rescue StandardError => e
    handle_error(e)
  end
  super(['', message.encoded])
end