Module: CronoTrigger::Worker
- Defined in:
- lib/crono_trigger/worker.rb
Constant Summary collapse
- HEARTBEAT_INTERVAL =
60
- SIGNAL_FETCH_INTERVAL =
10
- MONITOR_INTERVAL =
20
- EXECUTOR_SHUTDOWN_TIMELIMIT =
300
- OTHER_THREAD_SHUTDOWN_TIMELIMIT =
120
Instance Attribute Summary collapse
-
#polling_threads ⇒ Object
readonly
Returns the value of attribute polling_threads.
Instance Method Summary collapse
Instance Attribute Details
#polling_threads ⇒ Object (readonly)
Returns the value of attribute polling_threads.
12 13 14 |
# File 'lib/crono_trigger/worker.rb', line 12 def polling_threads @polling_threads end |
Instance Method Details
#initialize ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/crono_trigger/worker.rb', line 14 def initialize @crono_trigger_worker_id = CronoTrigger.config.worker_id @stop_flag = ServerEngine::BlockingFlag.new @heartbeat_stop_flag = ServerEngine::BlockingFlag.new @signal_fetch_stop_flag = ServerEngine::BlockingFlag.new @monitor_stop_flag = ServerEngine::BlockingFlag.new @model_queue = Queue.new @model_names = CronoTrigger.config.model_names || CronoTrigger::Schedulable.included_by @model_names.each do |model_name| @model_queue << model_name end @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: CronoTrigger.config.executor_thread, max_queue: CronoTrigger.config.executor_thread * 2, fallback_policy: :caller_runs, ) @execution_counter = Concurrent::AtomicFixnum.new @logger = Logger.new(STDOUT) unless @logger ActiveRecord::Base.logger = @logger end |
#quiet? ⇒ Boolean
76 77 78 |
# File 'lib/crono_trigger/worker.rb', line 76 def quiet? @polling_threads&.all?(&:quiet?) end |
#run ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/crono_trigger/worker.rb', line 36 def run @heartbeat_thread = run_heartbeat_thread @signal_fetcn_thread = run_signal_fetch_thread @monitor_thread = run_monitor_thread polling_thread_count = CronoTrigger.config.polling_thread || [@model_names.size, Concurrent.processor_count].min # Assign local variable for Signal handling polling_threads = polling_thread_count.times.map { PollingThread.new(@model_queue, @stop_flag, @logger, @executor, @execution_counter) } @polling_threads = polling_threads @polling_threads.each(&:run) ServerEngine::SignalThread.new do |st| st.trap(:TSTP) do @logger.info("[worker_id:#{@crono_trigger_worker_id}] Transit to quiet mode") polling_threads.each(&:quiet) heartbeat end end @polling_threads.each(&:join) @executor.shutdown @executor.wait_for_termination(EXECUTOR_SHUTDOWN_TIMELIMIT) @heartbeat_thread.join(OTHER_THREAD_SHUTDOWN_TIMELIMIT) @signal_fetcn_thread.join(OTHER_THREAD_SHUTDOWN_TIMELIMIT) unregister end |
#stop ⇒ Object
65 66 67 68 69 70 |
# File 'lib/crono_trigger/worker.rb', line 65 def stop @stop_flag.set! @heartbeat_stop_flag.set! @signal_fetch_stop_flag.set! @monitor_stop_flag.set! end |
#stopped? ⇒ Boolean
72 73 74 |
# File 'lib/crono_trigger/worker.rb', line 72 def stopped? @stop_flag.set? end |