Class: Celerb::TaskPublisher

Inherits:
Object
  • Object
show all
Defined in:
lib/celerb/task_publisher.rb

Class Method Summary collapse

Class Method Details

.channelObject



4
5
6
# File 'lib/celerb/task_publisher.rb', line 4

def self.channel
  @channel
end

.connect(opts, connection = nil) ⇒ Object



8
9
10
11
12
13
14
15
# File 'lib/celerb/task_publisher.rb', line 8

def self.connect(opts, connection=nil)
  @channel = AMQP::Channel.new(connection)
  @channel.on_error { @channel.reuse }
  @default_exchange = @channel.direct(opts[:exchange],
    :key => opts[:key], :durable => true)
  @results = ResultConsumer.new @channel, opts
  @opts = opts
end

.delay_task(queue, task_name, task_args = [], task_kwargs = {}, task_id = nil, taskset_id = nil, expires = nil, eta = nil, exchange = nil, exchange_type = nil, retries = 0) ⇒ Object



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/celerb/task_publisher.rb', line 17

def self.delay_task(queue, task_name, task_args=[], task_kwargs={},
               task_id=nil, taskset_id=nil, expires=nil, eta=nil,
               exchange=nil, exchange_type=nil, retries=0)
  task_id ||= TaskPublisher.uniq_id
  publish(queue, {
    :task => task_name,
    :id   => task_id,
    :args => task_args,
    :kwargs  => task_kwargs,
    :retries => retries,
    :eta     => eta,
    :expires => expires
  })
  return task_id
end

.register_result_handler(task_id, expiry, &blk) ⇒ Object



33
34
35
# File 'lib/celerb/task_publisher.rb', line 33

def self.register_result_handler(task_id, expiry, &blk)
  @results.register(task_id, expiry, &blk)
end