Module: Cuniculus::SupervisorMethods
- Included in:
- Supervisor
- Defined in:
- lib/cuniculus/supervisor.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
Instance Method Summary collapse
- #connect(conn_opts) ⇒ Object
- #consumer_exception(consumer, _ex) ⇒ Object
- #create_consumers(conn, queues) ⇒ Object
- #initialize(config) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
11 12 13 |
# File 'lib/cuniculus/supervisor.rb', line 11 def config @config end |
Instance Method Details
#connect(conn_opts) ⇒ Object
30 31 32 33 34 35 36 |
# File 'lib/cuniculus/supervisor.rb', line 30 def connect(conn_opts) conn = ::Bunny.new(conn_opts) conn.start conn rescue StandardError => e raise Cuniculus.convert_exception_class(e, Cuniculus::RMQConnectionError) end |
#consumer_exception(consumer, _ex) ⇒ Object
48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/cuniculus/supervisor.rb', line 48 def consumer_exception(consumer, _ex) @consumer_lock.synchronize do @consumers.delete(consumer) unless @done # Reuse channel ch = consumer.channel name = consumer.queue.name c = Cuniculus::Consumer.new(self, name, ch) @consumers << c c.start end end end |
#create_consumers(conn, queues) ⇒ Object
38 39 40 41 42 43 44 45 46 |
# File 'lib/cuniculus/supervisor.rb', line 38 def create_consumers(conn, queues) consumers = [] queues.each do |_name, q_cfg| ch = conn.create_channel(nil, q_cfg.thread_pool_size) ch.prefetch(q_cfg.prefetch_count) if q_cfg.prefetch_count consumers << Cuniculus::Consumer.new(q_cfg, ch) end consumers end |
#initialize(config) ⇒ Object
13 14 15 16 17 18 19 |
# File 'lib/cuniculus/supervisor.rb', line 13 def initialize(config) @config = config conn = connect(config.rabbitmq_opts) @consumers = create_consumers(conn, config.queues) @consumer_lock = Mutex.new @done = false end |
#start ⇒ Object
21 22 23 |
# File 'lib/cuniculus/supervisor.rb', line 21 def start @consumers.each(&:start) end |
#stop ⇒ Object
25 26 27 28 |
# File 'lib/cuniculus/supervisor.rb', line 25 def stop @done = true @consumers.each(&:stop) end |