Module: PipelineToolkit::MessageCommand Abstract
- Included in:
- MessagePusher, MessageSink
- Defined in:
- lib/pipeline_toolkit/message_command.rb
Overview
Provide abstract base functionality for pipeline machines.
Required, To implement a your own machine, override the following methods:
-
include MessageCommand
-
#process_standard Handle the messages
Optionally, your machine can implement the following:
-
#initialize_machine Setup your machine
-
#description To change the name that is used to describe your machine in the montitoring interface
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
The options the machine should use.
Instance Method Summary collapse
-
#acknowledge(message) ⇒ Object
Notify the AMQP server server that we’ve handled the message.
-
#cleanup ⇒ Object
Cleans up any used resource, such as pipes and files.
-
#description ⇒ Object
A string describing the machine.
-
#initialize(options = {}) ⇒ Object
Initializes a new instance.
-
#initialize_machine ⇒ Object
abstract
chance to initialize any code that needs to take place once the EventMachine loop has started.
-
#pass_on(message) ⇒ Object
Pass the message to the next machine in the pipeline by writing the message to the STDOUT.
-
#process(message) ⇒ Object
Callback for the Handlers::MessageHandler when it receives a message.
-
#process_standard(message) ⇒ Object
abstract
Processes a message.
-
#process_system(message) ⇒ Object
Process the first system message to notify machines down the pipeline which named pipe to use for acknowledgements.
-
#send_description ⇒ Object
Can be overriden in the class the MessageCommand is included into.
-
#start ⇒ Object
Starts the machine up, ie.
-
#stop ⇒ Object
Stops the command.
- #write_to_pipe(message, pipe = $stdout) ⇒ Object
Instance Attribute Details
#options ⇒ Object (readonly)
The options the machine should use
44 45 46 |
# File 'lib/pipeline_toolkit/message_command.rb', line 44 def @options end |
Instance Method Details
#acknowledge(message) ⇒ Object
Notify the AMQP server server that we’ve handled the message
118 119 120 121 122 123 |
# File 'lib/pipeline_toolkit/message_command.rb', line 118 def acknowledge() return if [:ack_id].nil? DefaultLogger.debug("MessageCommand#acknowledge(message)") = {:msg_type => "ack", :ack_id => [:ack_id]} write_to_pipe(, @ack_pipe) end |
#cleanup ⇒ Object
Cleans up any used resource, such as pipes and files.
95 96 97 |
# File 'lib/pipeline_toolkit/message_command.rb', line 95 def cleanup @ack_pipe.close if @ack_pipe end |
#description ⇒ Object
A string describing the machine. The description is used to describe the machine in the monitoring interface. Default implement is to return the name of the Machine’s class. This can be overriden within idividual machines if you wish to change it.
164 165 166 167 |
# File 'lib/pipeline_toolkit/message_command.rb', line 164 def description # NB: Regex removes modules from description self.class.name.scan(/(\w+)$/).first end |
#initialize(options = {}) ⇒ Object
Initializes a new instance. A message command can only be initialized through the implementation instance.
52 53 54 |
# File 'lib/pipeline_toolkit/message_command.rb', line 52 def initialize( = {}) @options = end |
#initialize_machine ⇒ Object
Override in the class the MessageCommand is included into. Provides a
chance to initialize any code that needs to take place once the EventMachine loop has started.
140 141 142 143 |
# File 'lib/pipeline_toolkit/message_command.rb', line 140 def initialize_machine DefaultLogger.debug("MessageCommand#initialize_machine") # Implemented in class that includes me end |
#pass_on(message) ⇒ Object
Pass the message to the next machine in the pipeline by writing the message to the STDOUT.
130 131 132 133 |
# File 'lib/pipeline_toolkit/message_command.rb', line 130 def pass_on() DefaultLogger.debug("MessageCommand#pass_on(message)") write_to_pipe() end |
#process(message) ⇒ Object
Callback for the Handlers::MessageHandler when it receives a message
104 105 106 107 108 109 110 111 |
# File 'lib/pipeline_toolkit/message_command.rb', line 104 def process() DefaultLogger.debug("MessageCommand#process(message)") if [:msg_type] == "system" process_system() else process_standard() end end |
#process_standard(message) ⇒ Object
In your machine implementation you need to override the #process_standard method.
Processes a message. This method must call #pass_on if the message was handled successfully, or if the handling of the message fails for whatever reason, you need to decide how you are going to handle the failure of the message (write message to error log | queue | etc.) and then acknowledge the message by calling #acknowledge. Acknowledging the message mean that the message has been dealt with. All messages must be acknowledged by at least one worker in a pipeline.
153 154 155 156 157 |
# File 'lib/pipeline_toolkit/message_command.rb', line 153 def process_standard() DefaultLogger.debug("MessageCommand#process_standard(message)") # Implemented in class that includes me pass_on() end |
#process_system(message) ⇒ Object
Process the first system message to notify machines down the pipeline which named pipe to use for acknowledgements.
175 176 177 178 179 180 181 |
# File 'lib/pipeline_toolkit/message_command.rb', line 175 def process_system() DefaultLogger.debug("MessageCommand#process_system(message)") [:ack] = [:ack] # inherit setting from upstream @ack_pipe = File.open([:sys_pipe], "w") # open ack pipe (needs to already exist) send_description pass_on() end |
#send_description ⇒ Object
Can be overriden in the class the MessageCommand is included into. Called when the acknowledgement named pipe has been established.
191 192 193 194 |
# File 'lib/pipeline_toolkit/message_command.rb', line 191 def send_description = { :msg_type => "pipe_desc", :description => self.description } write_to_pipe(, @ack_pipe) end |
#start ⇒ Object
Starts the machine up, ie. creates a new eventmachine reactor loop and starts watching the STDIN for new messages
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/pipeline_toolkit/message_command.rb', line 60 def start DefaultLogger.debug("MessageCommand#start") Signal.trap('INT') { self.stop } Signal.trap('TERM') { self.stop } Signal.trap('PIPE') { self.stop } begin EM.run do conn = EM.watch($stdin, Handlers::MessageHandler, self, ) initialize_machine # must call this to setup callback to notify_readable conn.notify_readable = true end rescue Exception => e DefaultLogger.error("#{e.class.name}: #{e.}\n" << e.backtrace.join("\n")) raise e ensure cleanup end end |
#stop ⇒ Object
Stops the command. This stops the event loop, giving it enough time to clear its buffers
86 87 88 89 90 |
# File 'lib/pipeline_toolkit/message_command.rb', line 86 def stop DefaultLogger.info("Shutting down #{self.class.name}") DefaultLogger.info "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^" EM.next_tick { EM.stop } end |
#write_to_pipe(message, pipe = $stdout) ⇒ Object
183 184 185 |
# File 'lib/pipeline_toolkit/message_command.rb', line 183 def write_to_pipe(, pipe=$stdout) pipe.syswrite(MessageCoder.encode() << "\n") end |