Class: PikaQue::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/pika_que/runner.rb

Instance Method Summary collapse

Instance Method Details

#add_processor(opts = {}) ⇒ Object



38
39
40
41
# File 'lib/pika_que/runner.rb', line 38

def add_processor(opts = {})
  classified_workers = { :workers => PikaQue::Util.worker_classes(opts[:workers]) }
  processors << processor(opts.merge(classified_workers))
end

#configObject



47
48
49
# File 'lib/pika_que/runner.rb', line 47

def config
  PikaQue.config
end

#processor(opts = {}) ⇒ Object



31
32
33
34
35
36
# File 'lib/pika_que/runner.rb', line 31

def processor(opts = {})
  {
    :processor        => PikaQue::Processor,
    :workers          => []
  }.merge(opts)
end

#processorsObject



43
44
45
# File 'lib/pika_que/runner.rb', line 43

def processors
  @processors ||= []
end

#runObject



4
5
6
7
8
9
10
11
12
13
14
# File 'lib/pika_que/runner.rb', line 4

def run
  # TODO anything to add to run_config?
  run_config = {}

  @processes = []
  processors.each do |processor_hash|
    _processor = PikaQue::Util.constantize(processor_hash[:processor]).new(processor_hash.merge(run_config))
    _processor.start
    @processes << _processor
  end
end

#setup_processorsObject



22
23
24
25
26
27
28
29
# File 'lib/pika_que/runner.rb', line 22

def setup_processors
  add_processor(config[:delay_options]) if config[:delay]
  if config[:workers]
    add_processor({ workers: config[:workers] })
  else
    config[:processors].each{ |p| add_processor(p) }
  end
end

#stopObject

halt? pause?



17
18
19
20
# File 'lib/pika_que/runner.rb', line 17

def stop
  @processes.each(&:stop)
  PikaQue.connection.disconnect!
end