Module: SayWhen::Poller::BasePoller
- Included in:
- CelluloidPoller, ConcurrentPoller, SimplePoller
- Defined in:
- lib/say_when/poller/base_poller.rb
Class Method Summary collapse
Instance Method Summary collapse
- #acquire(time_now) ⇒ Object
- #error_tick_length ⇒ Object
- #job_error(msg, job, ex) ⇒ Object
- #logger ⇒ Object
- #process(job, time_now) ⇒ Object
- #process_jobs ⇒ Object
- #processor ⇒ Object
- #release(job) ⇒ Object
- #reset_acquired ⇒ Object
- #reset_acquired_length ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #storage ⇒ Object
- #tick(t = tick_length) ⇒ Object
- #tick_length ⇒ Object
Class Method Details
Instance Method Details
#acquire(time_now) ⇒ Object
54 55 56 57 58 59 60 61 62 |
# File 'lib/say_when/poller/base_poller.rb', line 54 def acquire(time_now) logger.debug "SayWhen:: Looking for job that should be ready to fire before #{time_now}" if job = self.storage.acquire_next(time_now) logger.debug "SayWhen:: got a job: #{job.inspect}" else logger.debug "SayWhen:: no jobs to acquire" end job end |
#error_tick_length ⇒ Object
87 88 89 |
# File 'lib/say_when/poller/base_poller.rb', line 87 def error_tick_length @error_tick_length ||= SayWhen.[:error_tick_length].to_f || tick_length end |
#job_error(msg, job, ex) ⇒ Object
30 31 32 33 34 |
# File 'lib/say_when/poller/base_poller.rb', line 30 def job_error(msg, job, ex) job_msg = job && " job:'#{job.inspect}'" logger.error "#{self.class.name} #{msg}#{job_msg}: #{ex.}\n\t#{ex.backtrace.join("\t\n")}" release(job) end |
#logger ⇒ Object
103 104 105 |
# File 'lib/say_when/poller/base_poller.rb', line 103 def logger SayWhen.logger end |
#process(job, time_now) ⇒ Object
64 65 66 67 68 69 70 71 72 |
# File 'lib/say_when/poller/base_poller.rb', line 64 def process(job, time_now) # delegate processing the trigger to the processor processor.process(job) logger.debug "SayWhen:: job processed: #{job.inspect}" # this should update next fire at, and put back in list of scheduled jobs storage.fired(job, time_now) logger.debug "SayWhen:: job fired: #{job.inspect}" end |
#process_jobs ⇒ Object
36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/say_when/poller/base_poller.rb', line 36 def process_jobs reset_acquired time_now = Time.now while job = acquire(time_now) process(job, time_now) time_now = Time.now end rescue StandardError => ex job_error("Error!", job, ex) tick(error_tick_length) rescue Interrupt => ex job_error("Interrupt!", job, ex) raise ex rescue Exception => ex job_error("Exception!", job, ex) raise ex end |
#processor ⇒ Object
95 96 97 |
# File 'lib/say_when/poller/base_poller.rb', line 95 def processor @processor ||= load_strategy(:processor, SayWhen.[:processor_strategy]) end |
#release(job) ⇒ Object
74 75 76 77 |
# File 'lib/say_when/poller/base_poller.rb', line 74 def release(job) logger.info "SayWhen::Scheduler release: #{job.inspect}" job.release if job end |
#reset_acquired ⇒ Object
19 20 21 22 23 24 25 26 27 28 |
# File 'lib/say_when/poller/base_poller.rb', line 19 def reset_acquired time_now = Time.now self.reset_next_at ||= time_now if reset_acquired_length > 0 && reset_next_at <= time_now self.reset_next_at = time_now + reset_acquired_length logger.debug "SayWhen:: reset acquired at #{time_now}, try again at #{reset_next_at}" storage.reset_acquired(reset_acquired_length) end end |
#reset_acquired_length ⇒ Object
91 92 93 |
# File 'lib/say_when/poller/base_poller.rb', line 91 def reset_acquired_length @reset_acquired_length ||= SayWhen.[:reset_acquired_length].to_f end |
#start ⇒ Object
16 17 |
# File 'lib/say_when/poller/base_poller.rb', line 16 def start end |
#stop ⇒ Object
13 14 |
# File 'lib/say_when/poller/base_poller.rb', line 13 def stop end |
#storage ⇒ Object
99 100 101 |
# File 'lib/say_when/poller/base_poller.rb', line 99 def storage @storage ||= load_strategy(:storage, SayWhen.[:storage_strategy]) end |
#tick(t = tick_length) ⇒ Object
79 80 81 |
# File 'lib/say_when/poller/base_poller.rb', line 79 def tick(t = tick_length) sleep(t.to_f) end |
#tick_length ⇒ Object
83 84 85 |
# File 'lib/say_when/poller/base_poller.rb', line 83 def tick_length @tick_length ||= SayWhen.[:tick_length].to_f end |