Class: Bushpig::Server

Inherits:
Object
  • Object
show all
Defined in:
lib/bushpig/server.rb

Instance Method Summary collapse

Constructor Details

#initialize(pool, timeout: 2, &handler) ⇒ Server

Returns a new instance of Server.



5
6
7
8
9
10
# File 'lib/bushpig/server.rb', line 5

def initialize(pool, timeout: 2, &handler)
  @pool = pool
  @timeout = timeout.to_i
  @handler = handler
  @done = false
end

Instance Method Details

#complete(job) ⇒ Object



105
106
107
108
109
110
# File 'lib/bushpig/server.rb', line 105

def complete(job)
  redis_pool.with do |conn|
    # conn.srem('running', job.job_key)
    conn.del(Bushpig.job_key(job.job_key))
  end
end

#fetch(queue) ⇒ Object



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/bushpig/server.rb', line 84

def fetch(queue)
  redis_pool.with do |conn|
    begin
      res = conn.bzpopmin(Bushpig.queue_key(queue), @timeout)
    rescue Redis::TimeoutError
      # TODO: warn user (once) that redis timeout set lower than pop timeout
      conn.close
      res = nil
    end
    return nil if res.nil?

    (_set, key, _score) = res
    # conn.sadd('running', key)

    payload = conn.get(Bushpig.job_key(key))
    return nil if payload.nil? # most likely job expired

    Bushpig::Job.hydrate(payload)
  end
end

#handle(job) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
# File 'lib/bushpig/server.rb', line 53

def handle(job)
  puts "Job starting: jid-#{job.job_id} jkey-#{job.job_key} #{job}"
  started = monotonic_time
  @handler.call(job)
  finished = monotonic_time
  elapsed = finished - started
  puts "Job completed: jid-#{job.job_id} #{elapsed} seconds"
rescue StandardError => e
  finished = monotonic_time
  elapsed = finished - started
  puts "Job raised exception: jid-#{job.job_id} #{elapsed} seconds: #{e}"
  notify_exception(job, e)
  ActiveRecord::Base.clear_active_connections!
end

#honeybadgerObject



68
69
70
71
72
73
74
# File 'lib/bushpig/server.rb', line 68

def honeybadger
  @honeybadger ||= begin
    Honeybadger
  rescue NameError
    nil
  end
end

#monotonic_timeObject



80
81
82
# File 'lib/bushpig/server.rb', line 80

def monotonic_time
  Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

#notify_exception(job, exception) ⇒ Object



76
77
78
# File 'lib/bushpig/server.rb', line 76

def notify_exception(job, exception)
  honeybadger&.notify(exception, context: { job_id: job.job_id, job_key: job.job_key, job: job.to_s })
end

#redis_poolObject



12
13
14
# File 'lib/bushpig/server.rb', line 12

def redis_pool
  @pool
end

#reset_signalsObject



29
30
31
32
# File 'lib/bushpig/server.rb', line 29

def reset_signals
  Signal.trap('INT', 'DEFAULT')
  Signal.trap('TERM', 'DEFAULT')
end

#serve(queue) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/bushpig/server.rb', line 34

def serve(queue)
  puts "Serving queue #{queue}"
  trap_signals

  until @done
    job = fetch(queue)
    if job.nil?
      ActiveRecord::Base.clear_active_connections!
      next
    end

    handle(job)
    complete(job)
  end

  reset_signals
  puts "Stop serving queue #{queue}"
end

#trap_signalsObject



16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/bushpig/server.rb', line 16

def trap_signals
  Signal.trap('INT') do
    reset_signals
    puts 'INT received, shutdown flagged'
    @done = true
  end
  Signal.trap('TERM') do
    reset_signals
    puts 'TERM received, shutdown flagged'
    @done = true
  end
end