Module: Distribot::Worker
- Defined in:
- lib/distribot/worker.rb
Defined Under Namespace
Modules: ClassMethods
Instance Attribute Summary collapse
-
#task_consumers ⇒ Object
Returns the value of attribute task_consumers.
Class Method Summary collapse
Instance Method Summary collapse
- #enumerate_tasks(message) ⇒ Object
- #handle_task_execution(task) ⇒ Object
- #logger ⇒ Object
- #prepare_for_enumeration ⇒ Object
- #process_single_task(context, task) ⇒ Object
- #run ⇒ Object
- #subscribe_to_task_queue ⇒ Object
Instance Attribute Details
#task_consumers ⇒ Object
Returns the value of attribute task_consumers.
43 44 45 |
# File 'lib/distribot/worker.rb', line 43 def task_consumers @task_consumers end |
Class Method Details
.included(base) ⇒ Object
6 7 8 |
# File 'lib/distribot/worker.rb', line 6 def self.included(base) base.extend(ClassMethods) end |
Instance Method Details
#enumerate_tasks(message) ⇒ Object
112 113 114 115 116 117 118 119 |
# File 'lib/distribot/worker.rb', line 112 def enumerate_tasks() trycatch do context = OpenStruct.new() flow = Distribot::Flow.find(context.flow_id) fail FlowCanceledError if flow.canceled? send(self.class.enumerator, context) end end |
#handle_task_execution(task) ⇒ Object
87 88 89 90 91 92 93 94 95 96 |
# File 'lib/distribot/worker.rb', line 87 def handle_task_execution(task) context = OpenStruct.new( flow_id: task[:flow_id], phase: task[:phase], finished_queue: 'distribot.flow.task.finished' ) trycatch do process_single_task(context, task) end end |
#prepare_for_enumeration ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 |
# File 'lib/distribot/worker.rb', line 55 def prepare_for_enumeration logger.tagged("handler:#{self.class}") do pool = Concurrent::FixedThreadPool.new(5) Distribot.subscribe(self.class.enumeration_queue, solo: true) do || pool.post do logger.tagged(.map { |k, v| [k, v].join(':') }) do context = OpenStruct.new() trycatch do tasks = enumerate_tasks() announce_tasks(context, , tasks) end end end end end end |
#process_single_task(context, task) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 |
# File 'lib/distribot/worker.rb', line 98 def process_single_task(context, task) inspect_task!(context) # Your code is called right here: send(self.class.processor, context, task) task_counter_key = "distribot.flow.#{context.flow_id}.#{context.phase}.#{self.class}.finished" Distribot.redis.decr(task_counter_key) publish_args = { flow_id: context.flow_id, phase: context.phase, handler: self.class.to_s } Distribot.publish!(context.finished_queue, publish_args) end |
#run ⇒ Object
45 46 47 48 49 |
# File 'lib/distribot/worker.rb', line 45 def run prepare_for_enumeration subscribe_to_task_queue self end |
#subscribe_to_task_queue ⇒ Object
72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/distribot/worker.rb', line 72 def subscribe_to_task_queue logger.tagged("handler:#{self.class}") do subscribe_args = { reenqueue_on_failure: true, solo: true } pool = Concurrent::FixedThreadPool.new(500) Distribot.subscribe(self.class.task_queue, subscribe_args) do |task| pool.post do = task.map { |k, v| [k, v].join(':') } logger.tagged() do handle_task_execution(task) end end end end end |