Class: Untied::Consumer::Worker

Inherits:
Object
  • Object
show all
Defined in:
lib/untied-consumer/worker.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Worker

Returns a new instance of Worker.



8
9
10
# File 'lib/untied-consumer/worker.rb', line 8

def initialize(opts={})
  @queue_name = opts[:queue_name] || ""
end

Class Method Details

.start(opts = {}) ⇒ Object

Initializes the worker and calls the start method



13
14
15
16
17
# File 'lib/untied-consumer/worker.rb', line 13

def self.start(opts={})
  worker = new(opts)
  worker.start
  worker
end

Instance Method Details

#daemonize(opts = {}, &block) ⇒ Object

Daemonizes the current worker. Remember you’ll need the daemons Gem in order to this method work correctly. A optional block may be passed in. The block is going to run in the context of the forked process.

Options:

:pids_dir => '/some/dir' Absolute path to the dir where pid files will live
:log_dir => '/some/dir' Absolute path to the dir where log files will live
:pname => 'mylovelydeamom'


27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/untied-consumer/worker.rb', line 27

def daemonize(opts={}, &block)
  require 'daemons' # just in case

  pname = opts.delete(:pname) || 'untiedc'
  config = {
    :backtrace  => true,
    :log_output => true,
    :dir_mode   => :normal,
    :dir        => opts[:pids_dir],
    :log_dir    => nil,
  }.merge(opts)

  if !(config[:dir] && config[:log_dir])
    raise ArgumentError.new("You need to provide pids_dir and log_dir")
  end

  FileUtils.mkdir_p(config[:dir])
  FileUtils.mkdir_p(config[:log_dir])

  @worker = self
  @block = block
  Daemons.run_proc(pname, config) do
    @block.call if @block
    @worker.start
  end
end

#processorObject



71
72
73
# File 'lib/untied-consumer/worker.rb', line 71

def processor
  @processor ||= Processor.new
end

#startObject

Listens to the mssage bus for relevant events. This method blocks the current thread.



56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/untied-consumer/worker.rb', line 56

def start
  AMQP.start do |connection|
    channel     = AMQP::Channel.new(connection)
    exchange    = channel.topic("untied", :auto_delete => true)
    @processor = processor

    channel.queue(@queue_name, :exclusive => true) do |queue|
      Consumer.config.logger.info "Worker initialized and listening"
      queue.bind(exchange, :routing_key => "untied.#").subscribe do |h,p|
        safe_process { @processor.process(h,p) }
      end
    end
  end
end