Class: CronoTrigger::PollingThread
- Inherits:
-
Object
- Object
- CronoTrigger::PollingThread
- Defined in:
- lib/crono_trigger/polling_thread.rb
Instance Method Summary collapse
- #alive? ⇒ Boolean
-
#initialize(model_queue, stop_flag, logger, executor, execution_counter) ⇒ PollingThread
constructor
A new instance of PollingThread.
- #join ⇒ Object
- #poll(model) ⇒ Object
- #quiet ⇒ Object
- #quiet? ⇒ Boolean
- #run ⇒ Object
- #worker_count=(n) ⇒ Object
Constructor Details
#initialize(model_queue, stop_flag, logger, executor, execution_counter) ⇒ PollingThread
Returns a new instance of PollingThread.
3 4 5 6 7 8 9 10 11 12 13 14 |
# File 'lib/crono_trigger/polling_thread.rb', line 3 def initialize(model_queue, stop_flag, logger, executor, execution_counter) @model_queue = model_queue @stop_flag = stop_flag @logger = logger @executor = executor if @executor.fallback_policy != :caller_runs raise ArgumentError, "executor's fallback policies except for :caller_runs are not supported" end @execution_counter = execution_counter @quiet = Concurrent::AtomicBoolean.new(false) @worker_count = 1 end |
Instance Method Details
#alive? ⇒ Boolean
52 53 54 |
# File 'lib/crono_trigger/polling_thread.rb', line 52 def alive? @thread.alive? end |
#join ⇒ Object
40 41 42 |
# File 'lib/crono_trigger/polling_thread.rb', line 40 def join @thread.join end |
#poll(model) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/crono_trigger/polling_thread.rb', line 56 def poll(model) @logger.info "(polling-thread-#{Thread.current.object_id}) Poll #{model}" maybe_has_next = true while maybe_has_next && !@stop_flag.set? records, maybe_has_next = CronoTrigger.retry_on_db_errors do model.connection_pool.with_connection do model.executables_with_lock(limit: CronoTrigger.config.fetch_records || CronoTrigger.config.executor_thread * 3, worker_count: @worker_count) end end records.each do |record| @executor.post do @execution_counter.increment begin process_record(record) ensure @execution_counter.decrement end end end end end |
#quiet ⇒ Object
44 45 46 |
# File 'lib/crono_trigger/polling_thread.rb', line 44 def quiet @quiet.make_true end |
#quiet? ⇒ Boolean
48 49 50 |
# File 'lib/crono_trigger/polling_thread.rb', line 48 def quiet? @quiet.true? end |
#run ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/crono_trigger/polling_thread.rb', line 16 def run @thread = Thread.start do @logger.info "(polling-thread-#{Thread.current.object_id}) Start polling thread" until @stop_flag.wait_for_set(CronoTrigger.config.polling_interval) next if quiet? CronoTrigger.reloader.call do begin model_name = @model_queue.pop(true) model = model_name.classify.constantize poll(model) rescue ThreadError => e @logger.error(e) unless e. == "queue empty" rescue => ex @logger.error(ex) CronoTrigger::GlobalExceptionHandler.handle_global_exception(ex) ensure @model_queue << model_name if model_name end end end end end |
#worker_count=(n) ⇒ Object
80 81 82 83 |
# File 'lib/crono_trigger/polling_thread.rb', line 80 def worker_count=(n) raise ArgumentError, "worker_count must be greater than 0" if n <= 0 @worker_count = n end |