Module: PipelineToolkit::Handlers::MessageHandler
- Defined in:
- lib/pipeline_toolkit/handlers/message_handler.rb
Overview
The Message Process handler that is called by Event Machine reactor loop when it receives a new message from IO watcher
Constant Summary collapse
- MAX_BUFFER =
3_000_000
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
Instance Method Summary collapse
-
#initialize(target, options = {}) ⇒ Object
Initialize a new instance.
-
#notify_readable ⇒ Object
The callback method that EventMachine calls as soon as a new message arrives.
- #receive_line(message_coded) ⇒ Object
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
13 14 15 |
# File 'lib/pipeline_toolkit/handlers/message_handler.rb', line 13 def @options end |
Instance Method Details
#initialize(target, options = {}) ⇒ Object
Initialize a new instance
21 22 23 24 |
# File 'lib/pipeline_toolkit/handlers/message_handler.rb', line 21 def initialize(target, = {}) @options = @target = target end |
#notify_readable ⇒ Object
The callback method that EventMachine calls as soon as a new message arrives
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/pipeline_toolkit/handlers/message_handler.rb', line 29 def notify_readable DefaultLogger.debug("Handlers::MessageHandler#notify_readable") # Grab everything from buffer, in case several messages have built up. # NB: Can't use gets or read because they block when reaching EOF. @buffer ||= BufferedTokenizer.new data = @io.read_nonblock(MAX_BUFFER) @buffer.extract(data).each do |line| receive_line(line) end rescue Exception => e # rescued here because main thread does not seem to see it DefaultLogger.error("#{e.class.name}: #{e.}\n" << e.backtrace.join("\n")) raise e end |
#receive_line(message_coded) ⇒ Object
46 47 48 49 50 51 52 53 54 |
# File 'lib/pipeline_toolkit/handlers/message_handler.rb', line 46 def receive_line() DefaultLogger.debug("Raw message: #{}") = MessageCoder.decode() DefaultLogger.debug("Message: #{.inspect}") @target.process() rescue MessagePack::UnpackError => e DefaultLogger.error("Couldn't unpack message: #{.inspect}") raise e end |