Class: Bunny::ReaderLoop
- Inherits:
-
Object
- Object
- Bunny::ReaderLoop
- Defined in:
- lib/bunny/reader_loop.rb
Overview
Network activity loop that reads and passes incoming AMQP 0.9.1 methods for processing. They are dispatched further down the line in Bunny::Session and Bunny::Channel. This loop uses a separate thread internally.
This mimics the way RabbitMQ Java is designed quite closely.
Instance Method Summary collapse
-
#initialize(transport, session, session_error_handler) ⇒ ReaderLoop
constructor
A new instance of ReaderLoop.
- #join ⇒ Object
- #kill ⇒ Object
- #raise(e) ⇒ Object
- #resume ⇒ Object
- #run_loop ⇒ Object
- #run_once ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #stopped? ⇒ Boolean
- #stopping? ⇒ Boolean
- #terminate_with(e) ⇒ Object
Constructor Details
#initialize(transport, session, session_error_handler) ⇒ ReaderLoop
Returns a new instance of ReaderLoop.
12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/bunny/reader_loop.rb', line 12 def initialize(transport, session, session_error_handler) @transport = transport @session = session @session_error_handler = session_error_handler @logger = @session.logger @mutex = Mutex.new @stopping = false @stopped = false @network_is_down = false end |
Instance Method Details
#join ⇒ Object
118 119 120 121 122 123 124 125 126 127 |
# File 'lib/bunny/reader_loop.rb', line 118 def join # Thread#join can/would trigger a re-raise of an unhandled exception in this thread. # In addition, Thread.handle_interrupt can be used by other libraries or application code # that would make this join operation fail with an obscure exception. # So we try to save everyone some really unpleasant debugging time by introducing # this condition which typically would not evaluate to true anyway. # # See ruby-amqp/bunny#589 and ruby-amqp/bunny#590 for background. @thread.join if @thread && @thread != Thread.current end |
#kill ⇒ Object
129 130 131 132 133 134 |
# File 'lib/bunny/reader_loop.rb', line 129 def kill if @thread @thread.kill @thread.join end end |
#raise(e) ⇒ Object
114 115 116 |
# File 'lib/bunny/reader_loop.rb', line 114 def raise(e) @thread.raise(e) if @thread end |
#resume ⇒ Object
30 31 32 |
# File 'lib/bunny/reader_loop.rb', line 30 def resume start end |
#run_loop ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/bunny/reader_loop.rb', line 35 def run_loop loop do begin break if @mutex.synchronize { @stopping || @stopped || @network_is_down } run_once rescue AMQ::Protocol::EmptyResponseError, IOError, SystemCallError, Timeout::Error, OpenSSL::OpenSSLError => e break if terminate? || @session.closing? || @session.closed? @network_is_down = true if @session.automatically_recover? log_exception(e, level: :warn) @session.handle_network_failure(e) else log_exception(e) @session_error_handler.raise(Bunny::NetworkFailure.new("detected a network failure: #{e.}", e)) end rescue ShutdownSignal => _ @mutex.synchronize { @stopping = true } break rescue Exception => e break if terminate? if !(@session.closing? || @session.closed?) log_exception(e) @network_is_down = true @session_error_handler.raise(Bunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.}", e)) end rescue Errno::EBADF => _ebadf break if terminate? # ignored, happens when we loop after the transport has already been closed @mutex.synchronize { @stopping = true } end end @mutex.synchronize { @stopped = true } end |
#run_once ⇒ Object
73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/bunny/reader_loop.rb', line 73 def run_once frame = @transport.read_next_frame return if frame.is_a?(AMQ::Protocol::HeartbeatFrame) if !frame.final? || frame.method_class.has_content? header = @transport.read_next_frame content = '' if header.body_size > 0 loop do body_frame = @transport.read_next_frame content << body_frame.decode_payload break if content.bytesize >= header.body_size end end @session.handle_frameset(frame.channel, [frame.decode_payload, header.decode_payload, content]) else @session.handle_frame(frame.channel, frame.decode_payload) end end |
#start ⇒ Object
26 27 28 |
# File 'lib/bunny/reader_loop.rb', line 26 def start @thread = Thread.new(&method(:run_loop)) end |
#stop ⇒ Object
96 97 98 |
# File 'lib/bunny/reader_loop.rb', line 96 def stop @mutex.synchronize { @stopping = true } end |
#stopped? ⇒ Boolean
100 101 102 |
# File 'lib/bunny/reader_loop.rb', line 100 def stopped? @mutex.synchronize { @stopped } end |
#stopping? ⇒ Boolean
104 105 106 |
# File 'lib/bunny/reader_loop.rb', line 104 def stopping? @mutex.synchronize { @stopping } end |
#terminate_with(e) ⇒ Object
108 109 110 111 112 |
# File 'lib/bunny/reader_loop.rb', line 108 def terminate_with(e) @mutex.synchronize { @stopping = true } self.raise(e) end |