Class: RCelery::Task

Inherits:
Object
  • Object
show all
Defined in:
lib/rcelery/task.rb,
lib/rcelery/task/runner.rb,
lib/rcelery/task/context.rb

Defined Under Namespace

Modules: States Classes: Context, MaxRetriesExceededError, RetryError, Runner

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Task

Returns a new instance of Task.



43
44
45
46
47
48
49
50
# File 'lib/rcelery/task.rb', line 43

def initialize(options = {})
  @name = options[:name]
  @method = options[:method]
  @routing_key = options[:routing_key]
  @ignore_result = options[:ignore_result].nil? ?
    true : options[:ignore_result]
  @request = Context.new(@name)
end

Class Attribute Details

.max_retriesObject

Returns the value of attribute max_retries.



12
13
14
# File 'lib/rcelery/task.rb', line 12

def max_retries
  @max_retries
end

.result_queue_expiresObject

Returns the value of attribute result_queue_expires.



12
13
14
# File 'lib/rcelery/task.rb', line 12

def result_queue_expires
  @result_queue_expires
end

Instance Attribute Details

#methodObject (readonly)

Returns the value of attribute method.



18
19
20
# File 'lib/rcelery/task.rb', line 18

def method
  @method
end

#nameObject (readonly)

Returns the value of attribute name.



18
19
20
# File 'lib/rcelery/task.rb', line 18

def name
  @name
end

#requestObject (readonly)

Returns the value of attribute request.



18
19
20
# File 'lib/rcelery/task.rb', line 18

def request
  @request
end

Class Method Details

.all_tasksObject



20
21
22
# File 'lib/rcelery/task.rb', line 20

def self.all_tasks
  @all_tasks ||= {}
end

.execute(message) ⇒ Object



24
25
26
27
28
# File 'lib/rcelery/task.rb', line 24

def self.execute(message)
  runner = Runner.new(message)
  runner.execute
  runner
end

.result_queue(task_id) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/rcelery/task.rb', line 30

def self.result_queue(task_id)
  queue_name = task_id.gsub('-', '')
  RCelery.channel.queue(
    queue_name,
    :durable => true,
    :auto_delete => true,
    :arguments => {'x-expires' => result_queue_expires}
  ).bind(
    RCelery.exchanges[:result],
    :routing_key => queue_name
  )
end

Instance Method Details

#apply_async(options = {}) ⇒ Object



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/rcelery/task.rb', line 81

def apply_async(options = {})
  task_id = options[:task_id] || UUID.generate
  task = {
    :id => task_id,
    :task => @name,
    :args => options[:args],
    :kwargs => options[:kwargs] || {}
  }
  task[:eta] = options[:eta].strftime("%Y-%m-%dT%H:%M:%S") if options[:eta]
  task[:retries] = options[:retries] if options[:retries]

  if RCelery.eager_mode?
    task[:eager] = true
    runner = Task.execute(JSON.parse(task.to_json))
    return (EagerResult.new(runner.result) unless ignore_result?)
  end

  pub_opts = {
    :persistent => true,
    :routing_key => options[:routing_key] || @routing_key
  }

  # initialize result queue first to avoid races
  res = ignore_result? ? nil: AsyncResult.new(task_id)
  RCelery.publish(:request, task, pub_opts)
  res
end

#delay(*args) ⇒ Object



52
53
54
55
# File 'lib/rcelery/task.rb', line 52

def delay(*args)
  kwargs = args.pop if args.last.is_a?(Hash)
  apply_async(:args => args, :kwargs => kwargs)
end

#ignore_result?Boolean

Returns:

  • (Boolean)


109
110
111
# File 'lib/rcelery/task.rb', line 109

def ignore_result?
  @ignore_result
end

#retry(options = {}) ⇒ Object

Raises:



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# File 'lib/rcelery/task.rb', line 57

def retry(options = {})
  args = options[:args] || request.args
  kwargs = options[:kwargs] || request.kwargs
  max_retries = options[:max_retries] || self.class.max_retries

  if (request.retries + 1) > max_retries
    if options[:exc]
      raise options[:exc]
    else
      raise MaxRetriesExceededError
    end
  end

  apply_async(
    :args => args,
    :kwargs => kwargs,
    :task_id => request.task_id,
    :retries => request.retries + 1,
    :eta => options[:eta] || default_eta
  )

  raise RetryError
end