Class: Bushpig::Server
- Inherits:
-
Object
- Object
- Bushpig::Server
- Defined in:
- lib/bushpig/server.rb
Instance Method Summary collapse
- #complete(job) ⇒ Object
- #fetch(queue) ⇒ Object
- #handle(job) ⇒ Object
- #honeybadger ⇒ Object
-
#initialize(pool, timeout: 2, &handler) ⇒ Server
constructor
A new instance of Server.
- #monotonic_time ⇒ Object
- #notify_exception(job, exception) ⇒ Object
- #redis_pool ⇒ Object
- #reset_signals ⇒ Object
- #serve(queue) ⇒ Object
- #trap_signals ⇒ Object
Constructor Details
#initialize(pool, timeout: 2, &handler) ⇒ Server
Returns a new instance of Server.
5 6 7 8 9 10 |
# File 'lib/bushpig/server.rb', line 5 def initialize(pool, timeout: 2, &handler) @pool = pool @timeout = timeout.to_i @handler = handler @done = false end |
Instance Method Details
#complete(job) ⇒ Object
105 106 107 108 109 110 |
# File 'lib/bushpig/server.rb', line 105 def complete(job) redis_pool.with do |conn| # conn.srem('running', job.job_key) conn.del(Bushpig.job_key(job.job_key)) end end |
#fetch(queue) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/bushpig/server.rb', line 84 def fetch(queue) redis_pool.with do |conn| begin res = conn.bzpopmin(Bushpig.queue_key(queue), @timeout) rescue Redis::TimeoutError # TODO: warn user (once) that redis timeout set lower than pop timeout conn.close res = nil end return nil if res.nil? (_set, key, _score) = res # conn.sadd('running', key) payload = conn.get(Bushpig.job_key(key)) return nil if payload.nil? # most likely job expired Bushpig::Job.hydrate(payload) end end |
#handle(job) ⇒ Object
53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/bushpig/server.rb', line 53 def handle(job) puts "Job starting: jid-#{job.job_id} jkey-#{job.job_key} #{job}" started = monotonic_time @handler.call(job) finished = monotonic_time elapsed = finished - started puts "Job completed: jid-#{job.job_id} #{elapsed} seconds" rescue StandardError => e finished = monotonic_time elapsed = finished - started puts "Job raised exception: jid-#{job.job_id} #{elapsed} seconds: #{e}" notify_exception(job, e) ActiveRecord::Base.clear_active_connections! end |
#honeybadger ⇒ Object
68 69 70 71 72 73 74 |
# File 'lib/bushpig/server.rb', line 68 def honeybadger @honeybadger ||= begin Honeybadger rescue NameError nil end end |
#monotonic_time ⇒ Object
80 81 82 |
# File 'lib/bushpig/server.rb', line 80 def monotonic_time Process.clock_gettime(Process::CLOCK_MONOTONIC) end |
#notify_exception(job, exception) ⇒ Object
76 77 78 |
# File 'lib/bushpig/server.rb', line 76 def notify_exception(job, exception) honeybadger&.notify(exception, context: { job_id: job.job_id, job_key: job.job_key, job: job.to_s }) end |
#redis_pool ⇒ Object
12 13 14 |
# File 'lib/bushpig/server.rb', line 12 def redis_pool @pool end |
#reset_signals ⇒ Object
29 30 31 32 |
# File 'lib/bushpig/server.rb', line 29 def reset_signals Signal.trap('INT', 'DEFAULT') Signal.trap('TERM', 'DEFAULT') end |
#serve(queue) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
# File 'lib/bushpig/server.rb', line 34 def serve(queue) puts "Serving queue #{queue}" trap_signals until @done job = fetch(queue) if job.nil? ActiveRecord::Base.clear_active_connections! next end handle(job) complete(job) end reset_signals puts "Stop serving queue #{queue}" end |
#trap_signals ⇒ Object
16 17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/bushpig/server.rb', line 16 def trap_signals Signal.trap('INT') do reset_signals puts 'INT received, shutdown flagged' @done = true end Signal.trap('TERM') do reset_signals puts 'TERM received, shutdown flagged' @done = true end end |