Module: PipelineToolkit::Handlers::MessageHandler

Defined in:
lib/pipeline_toolkit/handlers/message_handler.rb

Overview

The Message Process handler that is called by Event Machine reactor loop when it receives a new message from IO watcher

Constant Summary collapse

MAX_BUFFER =
3_000_000

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



13
14
15
# File 'lib/pipeline_toolkit/handlers/message_handler.rb', line 13

def options
  @options
end

Instance Method Details

#initialize(target, options = {}) ⇒ Object

Initialize a new instance

Parameters:

  • msg_command (MessageCommand)

    An instance of the object to delegate the message handling to

  • options (Hash) (defaults to: {})

    An options hash



21
22
23
24
# File 'lib/pipeline_toolkit/handlers/message_handler.rb', line 21

def initialize(target, options = {})
  @options = options
  @target = target
end

#notify_readableObject

The callback method that EventMachine calls as soon as a new message arrives



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/pipeline_toolkit/handlers/message_handler.rb', line 29

def notify_readable
  DefaultLogger.debug("Handlers::MessageHandler#notify_readable")

  # Grab everything from buffer, in case several messages have built up.
  # NB: Can't use gets or read because they block when reaching EOF.
  @buffer ||= BufferedTokenizer.new
  data = @io.read_nonblock(MAX_BUFFER)

  @buffer.extract(data).each do |line|
    receive_line(line)
  end
      
rescue Exception => e # rescued here because main thread does not seem to see it
  DefaultLogger.error("#{e.class.name}: #{e.message}\n" << e.backtrace.join("\n"))
  raise e
end

#receive_line(message_coded) ⇒ Object



46
47
48
49
50
51
52
53
54
# File 'lib/pipeline_toolkit/handlers/message_handler.rb', line 46

def receive_line(message_coded)
  DefaultLogger.debug("Raw message: #{message_coded}")
  message = MessageCoder.decode(message_coded)
  DefaultLogger.debug("Message: #{message.inspect}")
  @target.process(message)
rescue MessagePack::UnpackError => e
  DefaultLogger.error("Couldn't unpack message: #{message_coded.inspect}")
  raise e
end