Class: CronoTrigger::PollingThread

Inherits:
Object
  • Object
show all
Defined in:
lib/crono_trigger/polling_thread.rb

Instance Method Summary collapse

Constructor Details

#initialize(model_queue, stop_flag, logger, executor, execution_counter) ⇒ PollingThread

Returns a new instance of PollingThread.



3
4
5
6
7
8
9
10
11
12
13
14
# File 'lib/crono_trigger/polling_thread.rb', line 3

def initialize(model_queue, stop_flag, logger, executor, execution_counter)
  @model_queue = model_queue
  @stop_flag = stop_flag
  @logger = logger
  @executor = executor
  if @executor.fallback_policy != :caller_runs
    raise ArgumentError, "executor's fallback policies except for :caller_runs are not supported"
  end
  @execution_counter = execution_counter
  @quiet = Concurrent::AtomicBoolean.new(false)
  @worker_count = 1
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


52
53
54
# File 'lib/crono_trigger/polling_thread.rb', line 52

def alive?
  @thread.alive?
end

#joinObject



40
41
42
# File 'lib/crono_trigger/polling_thread.rb', line 40

def join
  @thread.join
end

#poll(model) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/crono_trigger/polling_thread.rb', line 56

def poll(model)
  @logger.info "(polling-thread-#{Thread.current.object_id}) Poll #{model}"

  maybe_has_next = true
  while maybe_has_next && !@stop_flag.set?
    records, maybe_has_next = CronoTrigger.retry_on_db_errors do
      model.connection_pool.with_connection do
        model.executables_with_lock(limit: CronoTrigger.config.fetch_records || CronoTrigger.config.executor_thread * 3, worker_count: @worker_count)
      end
    end

    records.each do |record|
      @executor.post do
        @execution_counter.increment
        begin
          process_record(record)
        ensure
          @execution_counter.decrement
        end
      end
    end
  end
end

#quietObject



44
45
46
# File 'lib/crono_trigger/polling_thread.rb', line 44

def quiet
  @quiet.make_true
end

#quiet?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/crono_trigger/polling_thread.rb', line 48

def quiet?
  @quiet.true?
end

#runObject



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/crono_trigger/polling_thread.rb', line 16

def run
  @thread = Thread.start do
    @logger.info "(polling-thread-#{Thread.current.object_id}) Start polling thread"
    until @stop_flag.wait_for_set(CronoTrigger.config.polling_interval)
      next if quiet?

      CronoTrigger.reloader.call do
        begin
          model_name = @model_queue.pop(true)
          model = model_name.classify.constantize
          poll(model)
        rescue ThreadError => e
          @logger.error(e) unless e.message == "queue empty"
        rescue => ex
          @logger.error(ex)
          CronoTrigger::GlobalExceptionHandler.handle_global_exception(ex)
        ensure
          @model_queue << model_name if model_name
        end
      end
    end
  end
end

#worker_count=(n) ⇒ Object

Raises:

  • (ArgumentError)


80
81
82
83
# File 'lib/crono_trigger/polling_thread.rb', line 80

def worker_count=(n)
  raise ArgumentError, "worker_count must be greater than 0" if n <= 0
  @worker_count = n
end