Module: Abid::RakeExtensions::Task
- Defined in:
- lib/abid/rake_extensions/task.rb
Instance Method Summary collapse
- #async_execute_in_worker(worker = nil, &block) ⇒ Object
- #async_execute_with_session(task_args) ⇒ Object
- #async_invoke(*args) ⇒ Object
- #async_invoke_tasks(tasks, task_args, invocation_chain, &block) ⇒ Object
- #async_invoke_with_call_chain(task_args, invocation_chain) ⇒ Object
- #async_invoke_with_prerequisites(task_args, invocation_chain) ⇒ Object
- #async_wait_complete ⇒ Object
- #name_with_params ⇒ Object
- #state ⇒ Object
- #volatile? ⇒ Boolean
- #worker ⇒ Object
Instance Method Details
#async_execute_in_worker(worker = nil, &block) ⇒ Object
135 136 137 138 139 140 141 142 143 |
# File 'lib/abid/rake_extensions/task.rb', line 135 def async_execute_in_worker(worker = nil, &block) application.worker[worker || self.worker].post do begin block.call rescue Exception => err state.ivar.try_fail(err) end end end |
#async_execute_with_session(task_args) ⇒ Object
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/abid/rake_extensions/task.rb', line 88 def async_execute_with_session(task_args) async_execute_in_worker do begin state.session do begin execute(task_args) if needed? finished = true ensure fail "#{name} -- thread killed" if $ERROR_INFO.nil? && !finished end end state.ivar.try_set(true) rescue AbidErrorTaskAlreadyRunning async_wait_complete end end end |
#async_invoke(*args) ⇒ Object
20 21 22 23 |
# File 'lib/abid/rake_extensions/task.rb', line 20 def async_invoke(*args) task_args = Rake::TaskArguments.new(arg_names, args) async_invoke_with_call_chain(task_args, Rake::InvocationChain::EMPTY) end |
#async_invoke_tasks(tasks, task_args, invocation_chain, &block) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/abid/rake_extensions/task.rb', line 63 def async_invoke_tasks(tasks, task_args, invocation_chain, &block) ivars = tasks.map do |t| args = task_args.new_scope(t.arg_names) t.async_invoke_with_call_chain(args, invocation_chain) end if ivars.empty? block.call(false) else counter = Concurrent::DependencyCounter.new(ivars.size) do begin if ivars.any?(&:rejected?) state.ivar.try_fail(ivars.find(&:rejected?).reason) else updated = ivars.map(&:value).any? block.call(updated) end rescue Exception => err state.ivar.try_fail(err) end end ivars.each { |i| i.add_observer counter } end end |
#async_invoke_with_call_chain(task_args, invocation_chain) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/abid/rake_extensions/task.rb', line 25 def async_invoke_with_call_chain(task_args, invocation_chain) state.reload new_chain = Rake::InvocationChain.append(self, invocation_chain) state.only_once do if !application..repair && state.successed? # skip if successed state.ivar.try_set(false) elsif !application..repair && state.failed? && !invocation_chain.empty? # fail if not top level fail "#{name} -- task has been failed" rescue state.ivar.try_fail($ERROR_INFO) else async_invoke_with_prerequisites(task_args, new_chain) end end state.ivar ensure state.ivar.try_fail($ERROR_INFO) if $ERROR_INFO end |
#async_invoke_with_prerequisites(task_args, invocation_chain) ⇒ Object
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/abid/rake_extensions/task.rb', line 46 def async_invoke_with_prerequisites(task_args, invocation_chain) application.trace "** Invoke #{name_with_params}" if application..trace volatiles, non_volatiles = prerequisite_tasks.partition(&:volatile?) async_invoke_tasks(non_volatiles, task_args, invocation_chain) do |updated| if state.successed? && !updated application.trace "** Skip #{name_with_params}" if application..trace state.ivar.try_set(false) else async_invoke_tasks(volatiles, task_args, invocation_chain) do async_execute_with_session(task_args) end end end end |
#async_wait_complete ⇒ Object
107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/abid/rake_extensions/task.rb', line 107 def async_wait_complete unless application..wait_external_task err = RuntimeError.new("task #{name_with_params} already running") return state.ivar.try_fail(err) end application.trace "** Wait #{name_with_params}" if application..trace async_execute_in_worker(:waiter) do interval = application..wait_external_task_interval || 10 timeout = application..wait_external_task_timeout || 3600 timeout_tm = Time.now.to_f + timeout loop do state.reload if !state.running? state.ivar.try_set(true) break elsif Time.now.to_f >= timeout_tm fail "#{name} -- timeout exceeded" rescue state.ivar.try_fail($ERROR_INFO) break else sleep interval end end end end |
#name_with_params ⇒ Object
16 17 18 |
# File 'lib/abid/rake_extensions/task.rb', line 16 def name_with_params name end |
#state ⇒ Object
12 13 14 |
# File 'lib/abid/rake_extensions/task.rb', line 12 def state State.find(self) end |
#volatile? ⇒ Boolean
4 5 6 |
# File 'lib/abid/rake_extensions/task.rb', line 4 def volatile? true end |
#worker ⇒ Object
8 9 10 |
# File 'lib/abid/rake_extensions/task.rb', line 8 def worker :default end |