Class: Promiscuous::Subscriber::Worker::Runner::RunnerThread

Inherits:
Object
  • Object
show all
Defined in:
lib/promiscuous/subscriber/worker/runner.rb

Instance Method Summary collapse

Constructor Details

#initialize(message_queue) ⇒ RunnerThread

Returns a new instance of RunnerThread.



28
29
30
31
32
# File 'lib/promiscuous/subscriber/worker/runner.rb', line 28

def initialize(message_queue)
  @message_queue = message_queue
  @kill_lock = Mutex.new
  @thread = Thread.new { main_loop }
end

Instance Method Details

#main_loopObject



34
35
36
37
38
39
40
41
42
43
# File 'lib/promiscuous/subscriber/worker/runner.rb', line 34

def main_loop
  loop do
    msg = @message_queue.pop
    @kill_lock.synchronize do
      @current_message = msg
      msg.process # msg.process does not throw
      @current_message = nil
    end
  end
end

#show_stop_status(num_requests) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/promiscuous/subscriber/worker/runner.rb', line 53

def show_stop_status(num_requests)
  msg = @current_message
  backtrace = @thread.backtrace

  if msg
    STDERR.puts "Still processing #{msg.payload}"

    if num_requests > 1 && backtrace
      STDERR.puts
      STDERR.puts backtrace.map { |line| "  \e[1;30m#{line}\e[0m\n" }
      STDERR.puts
      STDERR.puts "I'm a little busy, check out my stack trace."
      STDERR.puts "Be patient (or kill me with -9, but that wouldn't be very nice of you)."
    else
      STDERR.puts "Just a second..."
    end
  end
end

#stopObject



45
46
47
48
49
50
51
# File 'lib/promiscuous/subscriber/worker/runner.rb', line 45

def stop
  if @kill_lock.locked? && @thread.stop?
    @thread.kill
  else
    @kill_lock.synchronize { @thread.kill }
  end
end