Class: PipelineToolkit::MessageSubscriber
- Inherits:
-
Object
- Object
- PipelineToolkit::MessageSubscriber
- Includes:
- Amqp::Reader
- Defined in:
- lib/pipeline_toolkit/message_subscriber.rb
Overview
The message subscriber is used to subscribe to a AMQP server queue and handle a single message at a time.
Constant Summary collapse
- PIPE_PATH =
"/tmp"
Instance Attribute Summary collapse
-
#average_mps ⇒ Object
readonly
Returns the value of attribute average_mps.
-
#mps ⇒ Object
readonly
Returns the value of attribute mps.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#start_time ⇒ Object
readonly
Returns the value of attribute start_time.
-
#structure ⇒ Object
Returns the value of attribute structure.
-
#total_messages ⇒ Object
readonly
Returns the value of attribute total_messages.
Instance Method Summary collapse
-
#cleanup ⇒ Object
Cleans up any used resource, such as pipes and files.
- #description ⇒ Object
-
#get_binding ⇒ Object
Support templating of member data.
-
#hostname ⇒ Object
The hostname of the server the subscriber is running on.
-
#initialize(options = {}) ⇒ MessageSubscriber
constructor
Initializes a new instance.
- #monitoring_enabled? ⇒ Boolean
-
#process(message) ⇒ Object
Callback for the Handlers::MessageHandler when it receives a message.
-
#process_id ⇒ Object
:nodoc:.
-
#start ⇒ Object
Starts the subscriber reactor loop.
-
#stop ⇒ Object
Stops the command.
-
#uptime ⇒ Object
The uptime of the subscriber in mins.
- #write_pid(path) ⇒ Object
Methods included from Amqp::Reader
#initialize_reader, #perform_acknowledgement, #queue_subscribe, #store_acknowledgement
Methods included from Amqp::Abstract
#bind_queue, #initialize_channel, #initialize_connection, #initialize_exchange, #initialize_queue, #stop_connection
Constructor Details
#initialize(options = {}) ⇒ MessageSubscriber
Initializes a new instance
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 29 def initialize( = {}) @options = [:ack] = ![:no_acks] DefaultLogger.init_logger() if [:log_conf] DefaultLogger.info "================================================================" DefaultLogger.info "Booting #{self.class.name} (#{[:env]})" DefaultLogger.info "Exchange: #{[:exchange]} (#{[:type]} passive:#{[:passive]} durable:#{[:durable]})" DefaultLogger.info "Queue: #{[:queue]} (ack:#{[:ack]})" DefaultLogger.info "amqp://#{[:user]}:#{[:pass]}@#{[:host]}:#{[:port]}#{[:vhost]}" DefaultLogger.info "Monitoring: http://localhost:#{[:http_port]}/ (#{[:content_type]})" DefaultLogger.info "" @structure = [description] write_pid([:pid_path]) if [:pid_path] @total_messages = 0 end |
Instance Attribute Details
#average_mps ⇒ Object (readonly)
Returns the value of attribute average_mps.
19 20 21 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 19 def average_mps @average_mps end |
#mps ⇒ Object (readonly)
Returns the value of attribute mps.
16 17 18 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 16 def mps @mps end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
17 18 19 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 17 def @options end |
#start_time ⇒ Object (readonly)
Returns the value of attribute start_time.
15 16 17 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 15 def start_time @start_time end |
#structure ⇒ Object
Returns the value of attribute structure.
20 21 22 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 20 def structure @structure end |
#total_messages ⇒ Object (readonly)
Returns the value of attribute total_messages.
18 19 20 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 18 def @total_messages end |
Instance Method Details
#cleanup ⇒ Object
Cleans up any used resource, such as pipes and files.
99 100 101 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 99 def cleanup destroy_sys_pipe end |
#description ⇒ Object
155 156 157 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 155 def description "queue:#{[:queue]}" end |
#get_binding ⇒ Object
Support templating of member data.
132 133 134 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 132 def get_binding binding end |
#hostname ⇒ Object
The hostname of the server the subscriber is running on
151 152 153 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 151 def hostname Socket.gethostname end |
#monitoring_enabled? ⇒ Boolean
108 109 110 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 108 def monitoring_enabled? ![:http_port].nil? end |
#process(message) ⇒ Object
Callback for the Handlers::MessageHandler when it receives a message
117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 117 def process() case [:msg_type] when "ack" DefaultLogger.debug("Acknowledging message #{[:ack_id]}") perform_acknowledgement([:ack_id]) when "pipe_desc" @structure << [:description].to_s else raise "Unknown control message received: #{.inspect}" end end |
#process_id ⇒ Object
:nodoc:
137 138 139 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 137 def process_id Process.pid.to_s end |
#start ⇒ Object
Starts the subscriber reactor loop
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 54 def start DefaultLogger.debug("MessageSubscriber#start") begin create_sys_pipe Signal.trap('INT') { self.stop } Signal.trap('TERM') { self.stop } Signal.trap('PIPE') { self.stop } EM.run do @start_time = Time.now initialize_reader queue_subscribe conn = EM.watch(@sys_pipe, Handlers::MessageHandler, self, ) # must call this to setup callback to notify_readable conn.notify_readable = true if monitoring_enabled? EM.start_server('0.0.0.0', [:http_port], Monitoring::MonitorServer, self, ) EM.add_periodic_timer(5) { } end end rescue Exception => e DefaultLogger.error "#{e.class.name}: #{e.}\n" << e.backtrace.join("\n") raise e ensure cleanup end end |
#stop ⇒ Object
Stops the command. This stops the event loop, giving it enough time to clear its buffers
90 91 92 93 94 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 90 def stop DefaultLogger.info("Shutting down #{self.class.name}") DefaultLogger.info "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^" stop_connection { EM.stop } end |
#uptime ⇒ Object
The uptime of the subscriber in mins
144 145 146 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 144 def uptime (Time.now - self.start_time).to_i end |
#write_pid(path) ⇒ Object
103 104 105 106 |
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 103 def write_pid(path) DefaultLogger.info "Pid file written to #{[:pid_path]}" if [:pid_path] `echo #{Process.pid} > #{path}` end |