Class: Celerb::TaskPublisher
- Inherits:
-
Object
- Object
- Celerb::TaskPublisher
- Defined in:
- lib/celerb/task_publisher.rb
Class Method Summary collapse
- .channel ⇒ Object
- .connect(opts, connection = nil) ⇒ Object
- .delay_task(queue, task_name, task_args = [], task_kwargs = {}, task_id = nil, taskset_id = nil, expires = nil, eta = nil, exchange = nil, exchange_type = nil, retries = 0) ⇒ Object
- .register_result_handler(task_id, expiry, &blk) ⇒ Object
Class Method Details
.channel ⇒ Object
4 5 6 |
# File 'lib/celerb/task_publisher.rb', line 4 def self.channel @channel end |
.connect(opts, connection = nil) ⇒ Object
8 9 10 11 12 13 14 15 |
# File 'lib/celerb/task_publisher.rb', line 8 def self.connect(opts, connection=nil) @channel = AMQP::Channel.new(connection) @channel.on_error { @channel.reuse } @default_exchange = @channel.direct(opts[:exchange], :key => opts[:key], :durable => true) @results = ResultConsumer.new @channel, opts @opts = opts end |
.delay_task(queue, task_name, task_args = [], task_kwargs = {}, task_id = nil, taskset_id = nil, expires = nil, eta = nil, exchange = nil, exchange_type = nil, retries = 0) ⇒ Object
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
# File 'lib/celerb/task_publisher.rb', line 17 def self.delay_task(queue, task_name, task_args=[], task_kwargs={}, task_id=nil, taskset_id=nil, expires=nil, eta=nil, exchange=nil, exchange_type=nil, retries=0) task_id ||= TaskPublisher.uniq_id publish(queue, { :task => task_name, :id => task_id, :args => task_args, :kwargs => task_kwargs, :retries => retries, :eta => eta, :expires => expires }) return task_id end |
.register_result_handler(task_id, expiry, &blk) ⇒ Object
33 34 35 |
# File 'lib/celerb/task_publisher.rb', line 33 def self.register_result_handler(task_id, expiry, &blk) @results.register(task_id, expiry, &blk) end |