Class: RCelery::Task
- Inherits:
-
Object
- Object
- RCelery::Task
- Defined in:
- lib/rcelery/task.rb,
lib/rcelery/task/runner.rb,
lib/rcelery/task/context.rb
Defined Under Namespace
Modules: States Classes: Context, MaxRetriesExceededError, RetryError, Runner
Class Attribute Summary collapse
-
.max_retries ⇒ Object
Returns the value of attribute max_retries.
-
.result_queue_expires ⇒ Object
Returns the value of attribute result_queue_expires.
Instance Attribute Summary collapse
-
#method ⇒ Object
readonly
Returns the value of attribute method.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#request ⇒ Object
readonly
Returns the value of attribute request.
Class Method Summary collapse
Instance Method Summary collapse
- #apply_async(options = {}) ⇒ Object
- #delay(*args) ⇒ Object
- #ignore_result? ⇒ Boolean
-
#initialize(options = {}) ⇒ Task
constructor
A new instance of Task.
- #retry(options = {}) ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ Task
Returns a new instance of Task.
43 44 45 46 47 48 49 50 |
# File 'lib/rcelery/task.rb', line 43 def initialize( = {}) @name = [:name] @method = [:method] @routing_key = [:routing_key] @ignore_result = [:ignore_result].nil? ? true : [:ignore_result] @request = Context.new(@name) end |
Class Attribute Details
.max_retries ⇒ Object
Returns the value of attribute max_retries.
12 13 14 |
# File 'lib/rcelery/task.rb', line 12 def max_retries @max_retries end |
.result_queue_expires ⇒ Object
Returns the value of attribute result_queue_expires.
12 13 14 |
# File 'lib/rcelery/task.rb', line 12 def result_queue_expires @result_queue_expires end |
Instance Attribute Details
#method ⇒ Object (readonly)
Returns the value of attribute method.
18 19 20 |
# File 'lib/rcelery/task.rb', line 18 def method @method end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
18 19 20 |
# File 'lib/rcelery/task.rb', line 18 def name @name end |
#request ⇒ Object (readonly)
Returns the value of attribute request.
18 19 20 |
# File 'lib/rcelery/task.rb', line 18 def request @request end |
Class Method Details
.all_tasks ⇒ Object
20 21 22 |
# File 'lib/rcelery/task.rb', line 20 def self.all_tasks @all_tasks ||= {} end |
.execute(message) ⇒ Object
24 25 26 27 28 |
# File 'lib/rcelery/task.rb', line 24 def self.execute() runner = Runner.new() runner.execute runner end |
.result_queue(task_id) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/rcelery/task.rb', line 30 def self.result_queue(task_id) queue_name = task_id.gsub('-', '') RCelery.channel.queue( queue_name, :durable => true, :auto_delete => true, :arguments => {'x-expires' => result_queue_expires} ).bind( RCelery.exchanges[:result], :routing_key => queue_name ) end |
Instance Method Details
#apply_async(options = {}) ⇒ Object
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
# File 'lib/rcelery/task.rb', line 81 def apply_async( = {}) task_id = [:task_id] || UUID.generate task = { :id => task_id, :task => @name, :args => [:args], :kwargs => [:kwargs] || {} } task[:eta] = [:eta].strftime("%Y-%m-%dT%H:%M:%S") if [:eta] task[:retries] = [:retries] if [:retries] if RCelery.eager_mode? task[:eager] = true runner = Task.execute(JSON.parse(task.to_json)) return (EagerResult.new(runner.result) unless ignore_result?) end pub_opts = { :persistent => true, :routing_key => [:routing_key] || @routing_key } # initialize result queue first to avoid races res = ignore_result? ? nil: AsyncResult.new(task_id) RCelery.publish(:request, task, pub_opts) res end |
#delay(*args) ⇒ Object
52 53 54 55 |
# File 'lib/rcelery/task.rb', line 52 def delay(*args) kwargs = args.pop if args.last.is_a?(Hash) apply_async(:args => args, :kwargs => kwargs) end |
#ignore_result? ⇒ Boolean
109 110 111 |
# File 'lib/rcelery/task.rb', line 109 def ignore_result? @ignore_result end |
#retry(options = {}) ⇒ Object
57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/rcelery/task.rb', line 57 def retry( = {}) args = [:args] || request.args kwargs = [:kwargs] || request.kwargs max_retries = [:max_retries] || self.class.max_retries if (request.retries + 1) > max_retries if [:exc] raise [:exc] else raise MaxRetriesExceededError end end apply_async( :args => args, :kwargs => kwargs, :task_id => request.task_id, :retries => request.retries + 1, :eta => [:eta] || default_eta ) raise RetryError end |