Module: Resque::Plugins::ResqueKalashnikov
- Defined in:
- lib/resque/plugins/resque_kalashnikov/resque_kalashnikov.rb
Class Method Summary collapse
Instance Method Summary collapse
-
#can_do_job_async?(job) ⇒ Boolean
test whenether we can do job async based on its name.
- #inspect_with_kalashnikov ⇒ Object
-
#monitor(interval) ⇒ Object
if resque worker gonna to stop - stop EM essentially, fiber-singleton.
- #reserve_with_kalashnikov ⇒ Object
- #work_async_on(job, &block) ⇒ Object
- #work_sync_on(job, &block) ⇒ Object
- #work_with_kalashnikov(interval = 5.0, &block) ⇒ Object
Class Method Details
.included(receiver) ⇒ Object
117 118 119 120 121 122 123 124 125 126 127 128 |
# File 'lib/resque/plugins/resque_kalashnikov/resque_kalashnikov.rb', line 117 def self.included(receiver) receiver.class_eval do alias work_without_kalashnikov work alias work work_with_kalashnikov alias inspect_without_kalashnikov inspect alias inspect inspect_with_kalashnikov alias reserve_without_kalashnikov reserve alias reserve reserve_with_kalashnikov end end |
Instance Method Details
#can_do_job_async?(job) ⇒ Boolean
test whenether we can do job async based on its name
96 97 98 |
# File 'lib/resque/plugins/resque_kalashnikov/resque_kalashnikov.rb', line 96 def can_do_job_async?(job) !! job.queue['async'] end |
#inspect_with_kalashnikov ⇒ Object
47 48 49 |
# File 'lib/resque/plugins/resque_kalashnikov/resque_kalashnikov.rb', line 47 def inspect_with_kalashnikov "#<KalashnikovWorker #{to_s}>" end |
#monitor(interval) ⇒ Object
if resque worker gonna to stop - stop EM essentially, fiber-singleton
80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/resque/plugins/resque_kalashnikov/resque_kalashnikov.rb', line 80 def monitor(interval) @monitor ||= Fiber.new do EM.add_periodic_timer(interval) do # monitor itself doesnt count in @fibers if (@fibers = @fibers.select(&:alive?)).empty? EM.stop if shutdown? else log! "Big brother says: #{@fibers.size} fibers alive" log! ObjectSpace.count_objects.to_s end end end.tap &:resume end |
#reserve_with_kalashnikov ⇒ Object
100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/resque/plugins/resque_kalashnikov/resque_kalashnikov.rb', line 100 def reserve_with_kalashnikov queues = Resque.queues.map { |q| "queue:#{q}" } # NO block for EM since using hiredis + em-synchrony GC.enable queue, value = redis.blpop(*queues, 0) GC.disable # shit happens if monitor fiber stops EM # it should happen only in tests raise EM::ForcedStop.new(queue) if queue['Redis disconnected'] log "popped: q=#{queue} v=#{value}" payload = decode value Resque::Job.new queue, payload end |
#work_async_on(job, &block) ⇒ Object
71 72 73 74 75 76 |
# File 'lib/resque/plugins/resque_kalashnikov/resque_kalashnikov.rb', line 71 def work_async_on(job, &block) log "work async" Fiber.new do perform(job, &block) end.tap &:resume end |
#work_sync_on(job, &block) ⇒ Object
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/resque/plugins/resque_kalashnikov/resque_kalashnikov.rb', line 51 def work_sync_on(job, &block) log 'work sync' if @child = fork(job) srand # Reseeding procline "Forked #{@child} at #{Time.now.to_i}" begin Process.waitpid(@child) rescue SystemCallError nil end job.fail(DirtyExit.new($?.to_s)) if $?.signaled? else unregister_signal_handlers if will_fork? && term_child procline "Processing #{job.queue} since #{Time.now.to_i}" #reconnect # cannot do it with hiredis perform(job, &block) exit!(true) if will_fork? end end |
#work_with_kalashnikov(interval = 5.0, &block) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/resque/plugins/resque_kalashnikov/resque_kalashnikov.rb', line 4 def work_with_kalashnikov(interval=5.0, &block) interval = Float(interval) @fibers = [] startup loop do break if shutdown? job = reserve log "got job in worker fiber: #{job.inspect}" job.worker = self working_on job if can_do_job_async? job @fibers << work_async_on(job, &block) else work_sync_on(job, &block) @child = nil end monitor(interval) done_working end unregister_worker rescue EM::ForcedStop => e # happens in fiber-mode # EM has stopped but we need # to reconnect to report it Resque.redis = Redis.connect unregister_worker rescue Resque::Helpers::DecodeException => e # agian, happens in fork-mode raise e unless e.to_s['Redis disconnected'] Resque.redis = Redis.connect unregister_worker rescue Exception => exception log exception.to_s log exception.backtrace.to_s unregister_worker(exception) end |