Class: Sbmt::Outbox::V2::BoxProcessor
- Inherits:
-
Object
- Object
- Sbmt::Outbox::V2::BoxProcessor
- Defined in:
- lib/sbmt/outbox/v2/box_processor.rb
Instance Attribute Summary collapse
-
#started ⇒ Object
readonly
Returns the value of attribute started.
-
#threads_count ⇒ Object
readonly
Returns the value of attribute threads_count.
-
#worker_name ⇒ Object
readonly
Returns the value of attribute worker_name.
Instance Method Summary collapse
- #alive?(timeout) ⇒ Boolean
-
#initialize(boxes:, threads_count:, name: "abstract_worker", redis: nil) ⇒ BoxProcessor
constructor
A new instance of BoxProcessor.
- #process_task(_worker_number, _task) ⇒ Object
- #ready? ⇒ Boolean
- #safe_process_task(worker_number, task) ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #throttle(_worker_number, _scheduled_task, _result) ⇒ Object
Constructor Details
#initialize(boxes:, threads_count:, name: "abstract_worker", redis: nil) ⇒ BoxProcessor
Returns a new instance of BoxProcessor.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/sbmt/outbox/v2/box_processor.rb', line 15 def initialize(boxes:, threads_count:, name: "abstract_worker", redis: nil) @threads_count = threads_count @worker_name = name @queue = build_task_queue(boxes) @thread_pool = ThreadPool.new( concurrency: threads_count, name: "#{name}_thread_pool" ) do queue.pop end @started = false init_redis(redis) end |
Instance Attribute Details
#started ⇒ Object (readonly)
Returns the value of attribute started.
13 14 15 |
# File 'lib/sbmt/outbox/v2/box_processor.rb', line 13 def started @started end |
#threads_count ⇒ Object (readonly)
Returns the value of attribute threads_count.
13 14 15 |
# File 'lib/sbmt/outbox/v2/box_processor.rb', line 13 def threads_count @threads_count end |
#worker_name ⇒ Object (readonly)
Returns the value of attribute worker_name.
13 14 15 |
# File 'lib/sbmt/outbox/v2/box_processor.rb', line 13 def worker_name @worker_name end |
Instance Method Details
#alive?(timeout) ⇒ Boolean
79 80 81 82 83 |
# File 'lib/sbmt/outbox/v2/box_processor.rb', line 79 def alive?(timeout) return false unless ready? @thread_pool.alive?(timeout) end |
#process_task(_worker_number, _task) ⇒ Object
97 98 99 |
# File 'lib/sbmt/outbox/v2/box_processor.rb', line 97 def process_task(_worker_number, _task) raise NotImplementedError, "Implement #process_task for Sbmt::Outbox::V2::BoxProcessor" end |
#ready? ⇒ Boolean
75 76 77 |
# File 'lib/sbmt/outbox/v2/box_processor.rb', line 75 def ready? started && @thread_pool.running? end |
#safe_process_task(worker_number, task) ⇒ Object
85 86 87 88 89 90 |
# File 'lib/sbmt/outbox/v2/box_processor.rb', line 85 def safe_process_task(worker_number, task) process_task(worker_number, task) rescue => e log_fatal(e, task) track_fatal(e, task) end |
#start ⇒ Object
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# File 'lib/sbmt/outbox/v2/box_processor.rb', line 32 def start raise "#{worker_name} is already started" if started @started = true thread_pool.start do |worker_number, scheduled_task| result = ThreadPool::PROCESSED last_result = Thread.current[:last_polling_result] throttling_res = throttle(worker_number, scheduled_task, last_result) next ThreadPool::SKIPPED if throttling_res&.value_or(nil) == Sbmt::Outbox::V2::Throttler::SKIP_STATUS lock_task(scheduled_task) do |locked_task| base_labels = scheduled_task.yabeda_labels.merge(worker_name: worker_name) if locked_task labels = base_labels.merge(locked_task.yabeda_labels) box_worker.job_execution_runtime.measure(labels) do ::Rails.application.executor.wrap do logger.(**locked_task.) do result = safe_process_task(worker_number, locked_task) end end end else result = ThreadPool::SKIPPED end box_worker.job_counter.increment(base_labels.merge(state: locked_task ? "processed" : "skipped"), by: 1) end Thread.current[:last_polling_result] = result || ThreadPool::PROCESSED ensure queue << scheduled_task end rescue => e Outbox.error_tracker.error(e) raise end |
#stop ⇒ Object
70 71 72 73 |
# File 'lib/sbmt/outbox/v2/box_processor.rb', line 70 def stop @started = false @thread_pool.stop end |
#throttle(_worker_number, _scheduled_task, _result) ⇒ Object
92 93 94 95 |
# File 'lib/sbmt/outbox/v2/box_processor.rb', line 92 def throttle(_worker_number, _scheduled_task, _result) # noop by default # IMPORTANT: method is called from thread-pool, i.e. code must be thread-safe end |