Class: Fluent::Plugin::ForwardOutput::AckHandler
- Inherits:
-
Object
- Object
- Fluent::Plugin::ForwardOutput::AckHandler
- Defined in:
- lib/fluent/plugin/out_forward/ack_handler.rb
Defined Under Namespace
Modules: Result Classes: ACKWaitingSockInfo, Ack
Instance Method Summary collapse
- #collect_response(select_interval) ⇒ Object
- #create_ack(chunk_id, node) ⇒ Object
- #enqueue(node, sock, cid) ⇒ Object
-
#initialize(timeout:, log:, read_length:) ⇒ AckHandler
constructor
A new instance of AckHandler.
Constructor Details
#initialize(timeout:, log:, read_length:) ⇒ AckHandler
Returns a new instance of AckHandler.
31 32 33 34 35 36 37 38 |
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 31 def initialize(timeout:, log:, read_length:) @mutex = Mutex.new @ack_waitings = [] @timeout = timeout @log = log @read_length = read_length @unpacker = Fluent::MessagePackFactory.msgpack_unpacker end |
Instance Method Details
#collect_response(select_interval) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 40 def collect_response(select_interval) now = Fluent::Clock.now sockets = [] results = [] begin new_list = [] @mutex.synchronize do @ack_waitings.each do |info| if info.expired?(now) # There are 2 types of cases when no response has been received from socket: # (1) the node does not support sending responses # (2) the node does support sending response but responses have not arrived for some reasons. @log.warn 'no response from node. regard it as unavailable.', host: info.node.host, port: info.node.port results << [info, Result::FAILED] else sockets << info.sock new_list << info end end @ack_waitings = new_list end begin readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval) rescue IOError @log.info "connection closed while waiting for readable sockets" readable_sockets = nil end if readable_sockets readable_sockets.each do |sock| results << read_ack_from_sock(sock) end end results.each do |info, ret| if info.nil? yield nil, nil, nil, ret else yield info.chunk_id, info.node, info.sock, ret end end rescue => e @log.error 'unexpected error while receiving ack', error: e @log.error_backtrace end end |
#create_ack(chunk_id, node) ⇒ Object
100 101 102 |
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 100 def create_ack(chunk_id, node) Ack.new(chunk_id, node, self) end |
#enqueue(node, sock, cid) ⇒ Object
104 105 106 107 108 109 |
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 104 def enqueue(node, sock, cid) info = ACKWaitingSockInfo.new(sock, cid, Base64.encode64(cid), node, Fluent::Clock.now + @timeout) @mutex.synchronize do @ack_waitings << info end end |