Module: CronoTrigger::Worker

Defined in:
lib/crono_trigger/worker.rb

Constant Summary collapse

HEARTBEAT_INTERVAL =
60
SIGNAL_FETCH_INTERVAL =
10
MONITOR_INTERVAL =
20
EXECUTOR_SHUTDOWN_TIMELIMIT =
300
OTHER_THREAD_SHUTDOWN_TIMELIMIT =
120

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#polling_threadsObject (readonly)

Returns the value of attribute polling_threads.



12
13
14
# File 'lib/crono_trigger/worker.rb', line 12

def polling_threads
  @polling_threads
end

Instance Method Details

#initializeObject



14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/crono_trigger/worker.rb', line 14

def initialize
  @crono_trigger_worker_id = CronoTrigger.config.worker_id
  @stop_flag = ServerEngine::BlockingFlag.new
  @heartbeat_stop_flag = ServerEngine::BlockingFlag.new
  @signal_fetch_stop_flag = ServerEngine::BlockingFlag.new
  @monitor_stop_flag = ServerEngine::BlockingFlag.new
  @model_queue = Queue.new
  @model_names = CronoTrigger.config.model_names || CronoTrigger::Schedulable.included_by
  @model_names.each do |model_name|
    @model_queue << model_name
  end
  @executor = Concurrent::ThreadPoolExecutor.new(
    min_threads: 1,
    max_threads: CronoTrigger.config.executor_thread,
    max_queue: CronoTrigger.config.executor_thread * 2,
    fallback_policy: :caller_runs,
  )
  @execution_counter = Concurrent::AtomicFixnum.new
  @logger = Logger.new(STDOUT) unless @logger
  ActiveRecord::Base.logger = @logger
end

#quiet?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/crono_trigger/worker.rb', line 76

def quiet?
  @polling_threads&.all?(&:quiet?)
end

#runObject



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/crono_trigger/worker.rb', line 36

def run
  @heartbeat_thread = run_heartbeat_thread
  @signal_fetcn_thread = run_signal_fetch_thread
  @monitor_thread = run_monitor_thread

  polling_thread_count = CronoTrigger.config.polling_thread || [@model_names.size, Concurrent.processor_count].min
  # Assign local variable for Signal handling
  polling_threads = polling_thread_count.times.map { PollingThread.new(@model_queue, @stop_flag, @logger, @executor, @execution_counter) }
  @polling_threads = polling_threads
  @polling_threads.each(&:run)

  ServerEngine::SignalThread.new do |st|
    st.trap(:TSTP) do
      @logger.info("[worker_id:#{@crono_trigger_worker_id}] Transit to quiet mode")
      polling_threads.each(&:quiet)
      heartbeat
    end
  end

  @polling_threads.each(&:join)

  @executor.shutdown
  @executor.wait_for_termination(EXECUTOR_SHUTDOWN_TIMELIMIT)
  @heartbeat_thread.join(OTHER_THREAD_SHUTDOWN_TIMELIMIT)
  @signal_fetcn_thread.join(OTHER_THREAD_SHUTDOWN_TIMELIMIT)

  unregister
end

#stopObject



65
66
67
68
69
70
# File 'lib/crono_trigger/worker.rb', line 65

def stop
  @stop_flag.set!
  @heartbeat_stop_flag.set!
  @signal_fetch_stop_flag.set!
  @monitor_stop_flag.set!
end

#stopped?Boolean

Returns:

  • (Boolean)


72
73
74
# File 'lib/crono_trigger/worker.rb', line 72

def stopped?
  @stop_flag.set?
end