Class: Untied::Consumer::Worker
- Inherits:
-
Object
- Object
- Untied::Consumer::Worker
- Defined in:
- lib/untied-consumer/worker.rb
Class Method Summary collapse
-
.start(opts = {}) ⇒ Object
Initializes the worker and calls the start method.
Instance Method Summary collapse
-
#daemonize(opts = {}, &block) ⇒ Object
Daemonizes the current worker.
-
#initialize(opts = {}) ⇒ Worker
constructor
A new instance of Worker.
- #processor ⇒ Object
-
#start ⇒ Object
Listens to the mssage bus for relevant events.
Constructor Details
#initialize(opts = {}) ⇒ Worker
Returns a new instance of Worker.
8 9 10 |
# File 'lib/untied-consumer/worker.rb', line 8 def initialize(opts={}) @queue_name = opts[:queue_name] || "" end |
Class Method Details
.start(opts = {}) ⇒ Object
Initializes the worker and calls the start method
13 14 15 16 17 |
# File 'lib/untied-consumer/worker.rb', line 13 def self.start(opts={}) worker = new(opts) worker.start worker end |
Instance Method Details
#daemonize(opts = {}, &block) ⇒ Object
Daemonizes the current worker. Remember you’ll need the daemons Gem in order to this method work correctly. A optional block may be passed in. The block is going to run in the context of the forked process.
Options:
:pids_dir => '/some/dir' Absolute path to the dir where pid files will live
:log_dir => '/some/dir' Absolute path to the dir where log files will live
:pname => 'mylovelydeamom'
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/untied-consumer/worker.rb', line 27 def daemonize(opts={}, &block) require 'daemons' # just in case pname = opts.delete(:pname) || 'untiedc' config = { :backtrace => true, :log_output => true, :dir_mode => :normal, :dir => opts[:pids_dir], :log_dir => nil, }.merge(opts) if !(config[:dir] && config[:log_dir]) raise ArgumentError.new("You need to provide pids_dir and log_dir") end FileUtils.mkdir_p(config[:dir]) FileUtils.mkdir_p(config[:log_dir]) @worker = self @block = block Daemons.run_proc(pname, config) do @block.call if @block @worker.start end end |
#processor ⇒ Object
71 72 73 |
# File 'lib/untied-consumer/worker.rb', line 71 def processor @processor ||= Processor.new end |
#start ⇒ Object
Listens to the mssage bus for relevant events. This method blocks the current thread.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/untied-consumer/worker.rb', line 56 def start AMQP.start do |connection| channel = AMQP::Channel.new(connection) exchange = channel.topic("untied", :auto_delete => true) @processor = processor channel.queue(@queue_name, :exclusive => true) do |queue| Consumer.config.logger.info "Worker initialized and listening" queue.bind(exchange, :routing_key => "untied.#").subscribe do |h,p| safe_process { @processor.process(h,p) } end end end end |