Class: GorgonBunny::ReaderLoop
- Inherits:
-
Object
- Object
- GorgonBunny::ReaderLoop
- Defined in:
- lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb
Overview
Network activity loop that reads and passes incoming AMQP 0.9.1 methods for processing. They are dispatched further down the line in GorgonBunny::Session and GorgonBunny::Channel. This loop uses a separate thread internally.
This mimics the way RabbitMQ Java is designed quite closely.
Instance Method Summary collapse
-
#initialize(transport, session, session_thread) ⇒ ReaderLoop
constructor
A new instance of ReaderLoop.
- #join ⇒ Object
- #kill ⇒ Object
- #log_exception(e) ⇒ Object
- #raise(e) ⇒ Object
- #resume ⇒ Object
- #run_loop ⇒ Object
- #run_once ⇒ Object
- #start ⇒ Object
- #stop ⇒ Object
- #stopped? ⇒ Boolean
Constructor Details
#initialize(transport, session, session_thread) ⇒ ReaderLoop
Returns a new instance of ReaderLoop.
12 13 14 15 16 17 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb', line 12 def initialize(transport, session, session_thread) @transport = transport @session = session @session_thread = session_thread @logger = @session.logger end |
Instance Method Details
#join ⇒ Object
97 98 99 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb', line 97 def join @thread.join if @thread end |
#kill ⇒ Object
101 102 103 104 105 106 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb', line 101 def kill if @thread @thread.kill @thread.join end end |
#log_exception(e) ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb', line 108 def log_exception(e) @logger.error "Exception in the reader loop: #{e.class.name}: #{e.}" @logger.error "Backtrace: " e.backtrace.each do |line| @logger.error "\t#{line}" end end |
#raise(e) ⇒ Object
93 94 95 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb', line 93 def raise(e) @thread.raise(e) if @thread end |
#resume ⇒ Object
24 25 26 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb', line 24 def resume start end |
#run_loop ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb', line 29 def run_loop loop do begin break if @stopping || @network_is_down run_once rescue Errno::EBADF => ebadf break if @stopping # ignored, happens when we loop after the transport has already been closed rescue GorgonAMQ::Protocol::EmptyResponseError, IOError, SystemCallError => e break if @stopping log_exception(e) @network_is_down = true if @session.automatically_recover? @session.handle_network_failure(e) else @session_thread.raise(GorgonBunny::NetworkFailure.new("detected a network failure: #{e.}", e)) end rescue ShutdownSignal => _ break rescue Exception => e break if @stopping log_exception(e) @network_is_down = true @session_thread.raise(GorgonBunny::NetworkFailure.new("caught an unexpected exception in the network loop: #{e.}", e)) end end @stopped = true end |
#run_once ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb', line 62 def run_once frame = @transport.read_next_frame return if frame.is_a?(GorgonAMQ::Protocol::HeartbeatFrame) if !frame.final? || frame.method_class.has_content? header = @transport.read_next_frame content = '' if header.body_size > 0 loop do body_frame = @transport.read_next_frame content << body_frame.decode_payload break if content.bytesize >= header.body_size end end @session.handle_frameset(frame.channel, [frame.decode_payload, header.decode_payload, content]) else @session.handle_frame(frame.channel, frame.decode_payload) end end |
#start ⇒ Object
20 21 22 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb', line 20 def start @thread = Thread.new(&method(:run_loop)) end |
#stop ⇒ Object
85 86 87 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb', line 85 def stop @stopping = true end |
#stopped? ⇒ Boolean
89 90 91 |
# File 'lib/gorgon_bunny/lib/gorgon_bunny/reader_loop.rb', line 89 def stopped? @stopped end |