Class: PikaQue::Runner
- Inherits:
-
Object
- Object
- PikaQue::Runner
- Defined in:
- lib/pika_que/runner.rb
Instance Method Summary collapse
- #add_processor(opts = {}) ⇒ Object
- #config ⇒ Object
- #processor(opts = {}) ⇒ Object
- #processors ⇒ Object
- #run ⇒ Object
- #setup_processors ⇒ Object
-
#stop ⇒ Object
halt? pause?.
Instance Method Details
#add_processor(opts = {}) ⇒ Object
38 39 40 41 |
# File 'lib/pika_que/runner.rb', line 38 def add_processor(opts = {}) classified_workers = { :workers => PikaQue::Util.worker_classes(opts[:workers]) } processors << processor(opts.merge(classified_workers)) end |
#processor(opts = {}) ⇒ Object
31 32 33 34 35 36 |
# File 'lib/pika_que/runner.rb', line 31 def processor(opts = {}) { :processor => PikaQue::Processor, :workers => [] }.merge(opts) end |
#processors ⇒ Object
43 44 45 |
# File 'lib/pika_que/runner.rb', line 43 def processors @processors ||= [] end |
#run ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 |
# File 'lib/pika_que/runner.rb', line 4 def run # TODO anything to add to run_config? run_config = {} @processes = [] processors.each do |processor_hash| _processor = PikaQue::Util.constantize(processor_hash[:processor]).new(processor_hash.merge(run_config)) _processor.start @processes << _processor end end |
#setup_processors ⇒ Object
22 23 24 25 26 27 28 29 |
# File 'lib/pika_que/runner.rb', line 22 def setup_processors add_processor(config[:delay_options]) if config[:delay] if config[:workers] add_processor({ workers: config[:workers] }) else config[:processors].each{ |p| add_processor(p) } end end |
#stop ⇒ Object
halt? pause?
17 18 19 20 |
# File 'lib/pika_que/runner.rb', line 17 def stop @processes.each(&:stop) PikaQue.connection.disconnect! end |