Class: PipelineToolkit::MessageSubscriber

Inherits:
Object
  • Object
show all
Includes:
Amqp::Reader
Defined in:
lib/pipeline_toolkit/message_subscriber.rb

Overview

The message subscriber is used to subscribe to a AMQP server queue and handle a single message at a time.

Constant Summary collapse

PIPE_PATH =
"/tmp"

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Amqp::Reader

#initialize_reader, #perform_acknowledgement, #queue_subscribe, #store_acknowledgement

Methods included from Amqp::Abstract

#bind_queue, #initialize_channel, #initialize_connection, #initialize_exchange, #initialize_queue, #stop_connection

Constructor Details

#initialize(options = {}) ⇒ MessageSubscriber

Initializes a new instance

Parameters:

  • options (Hash) (defaults to: {})

    An options hash, see command line interface.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 29

def initialize(options = {})
  @options = options
  options[:ack] = !options[:no_acks]
  
  DefaultLogger.init_logger(options) if options[:log_conf]

  DefaultLogger.info "================================================================"
  DefaultLogger.info "Booting #{self.class.name}  (#{options[:env]})"
  DefaultLogger.info "Exchange: #{options[:exchange]} (#{options[:type]} passive:#{options[:passive]} durable:#{options[:durable]})"
  DefaultLogger.info "Queue: #{options[:queue]} (ack:#{options[:ack]})"
  DefaultLogger.info "amqp://#{options[:user]}:#{options[:pass]}@#{options[:host]}:#{options[:port]}#{options[:vhost]}"
  DefaultLogger.info "Monitoring: http://localhost:#{options[:http_port]}/ (#{options[:content_type]})"
  DefaultLogger.info ""

  @structure = [description]
  write_pid(options[:pid_path]) if options[:pid_path]
  
  @total_messages = 0
    
  reset_message_statistics
end

Instance Attribute Details

#average_mpsObject (readonly)

Returns the value of attribute average_mps.



19
20
21
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 19

def average_mps
  @average_mps
end

#mpsObject (readonly)

Returns the value of attribute mps.



16
17
18
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 16

def mps
  @mps
end

#optionsObject (readonly)

Returns the value of attribute options.



17
18
19
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 17

def options
  @options
end

#start_timeObject (readonly)

Returns the value of attribute start_time.



15
16
17
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 15

def start_time
  @start_time
end

#structureObject

Returns the value of attribute structure.



20
21
22
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 20

def structure
  @structure
end

#total_messagesObject (readonly)

Returns the value of attribute total_messages.



18
19
20
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 18

def total_messages
  @total_messages
end

Instance Method Details

#cleanupObject

Cleans up any used resource, such as pipes and files.



99
100
101
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 99

def cleanup
  destroy_sys_pipe
end

#descriptionObject



155
156
157
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 155

def description
  "queue:#{options[:queue]}"
end

#get_bindingObject

Support templating of member data.



132
133
134
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 132

def get_binding
  binding
end

#hostnameObject

The hostname of the server the subscriber is running on



151
152
153
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 151

def hostname
  Socket.gethostname
end

#monitoring_enabled?Boolean

Returns:

  • (Boolean)


108
109
110
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 108

def monitoring_enabled?
  !options[:http_port].nil?
end

#process(message) ⇒ Object

Callback for the Handlers::MessageHandler when it receives a message

Parameters:

  • message (Hash)

    The decoded message



117
118
119
120
121
122
123
124
125
126
127
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 117

def process(message)
  case message[:msg_type]
    when "ack"
      DefaultLogger.debug("Acknowledging message #{message[:ack_id]}")
      perform_acknowledgement(message[:ack_id])
    when "pipe_desc"
      @structure << message[:description].to_s
    else
      raise "Unknown control message received: #{message.inspect}"
  end
end

#process_idObject

:nodoc:



137
138
139
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 137

def process_id
  Process.pid.to_s
end

#startObject

Starts the subscriber reactor loop



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 54

def start

  DefaultLogger.debug("MessageSubscriber#start")
  begin
    create_sys_pipe

    Signal.trap('INT')  { self.stop }
    Signal.trap('TERM') { self.stop }
    Signal.trap('PIPE') { self.stop }

    EM.run do
      @start_time = Time.now

      initialize_reader
      queue_subscribe
 
      conn = EM.watch(@sys_pipe, Handlers::MessageHandler, self, options)
      # must call this to setup callback to notify_readable
      conn.notify_readable = true

      if monitoring_enabled?
        EM.start_server('0.0.0.0', options[:http_port], Monitoring::MonitorServer, self, options)
        EM.add_periodic_timer(5) { calculate_message_statistics }
      end
    end
  rescue Exception => e
    DefaultLogger.error "#{e.class.name}: #{e.message}\n" << e.backtrace.join("\n")
    raise e
  ensure
    cleanup
  end
end

#stopObject

Stops the command. This stops the event loop, giving it enough time to clear its buffers



90
91
92
93
94
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 90

def stop
  DefaultLogger.info("Shutting down #{self.class.name}")
  DefaultLogger.info "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
  stop_connection { EM.stop }
end

#uptimeObject

The uptime of the subscriber in mins



144
145
146
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 144

def uptime
  (Time.now - self.start_time).to_i
end

#write_pid(path) ⇒ Object



103
104
105
106
# File 'lib/pipeline_toolkit/message_subscriber.rb', line 103

def write_pid(path)
  DefaultLogger.info "Pid file written to #{options[:pid_path]}" if options[:pid_path]
  `echo #{Process.pid} > #{path}`
end