Class: Fluent::Plugin::ForwardOutput::AckHandler
- Inherits:
-
Object
- Object
- Fluent::Plugin::ForwardOutput::AckHandler
- Defined in:
- lib/fluent/plugin/out_forward/ack_handler.rb
Defined Under Namespace
Modules: Result Classes: ACKWaitingSockInfo, Ack
Instance Method Summary collapse
- #collect_response(select_interval) ⇒ Object
- #create_ack(chunk_id, node) ⇒ Object
- #enqueue(node, sock, cid) ⇒ Object
-
#initialize(timeout:, log:, read_length:) ⇒ AckHandler
constructor
A new instance of AckHandler.
Constructor Details
#initialize(timeout:, log:, read_length:) ⇒ AckHandler
Returns a new instance of AckHandler.
31 32 33 34 35 36 37 38 |
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 31 def initialize(timeout:, log:, read_length:) @mutex = Mutex.new @ack_waitings = [] @timeout = timeout @log = log @read_length = read_length @unpacker = Fluent::MessagePackFactory.msgpack_unpacker end |
Instance Method Details
#collect_response(select_interval) ⇒ Object
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 40 def collect_response(select_interval) now = Fluent::Clock.now sockets = [] results = [] begin new_list = [] @mutex.synchronize do @ack_waitings.each do |info| if info.expired?(now) # There are 2 types of cases when no response has been received from socket: # (1) the node does not support sending responses # (2) the node does support sending response but responses have not arrived for some reasons. @log.warn 'no response from node. regard it as unavailable.', host: info.node.host, port: info.node.port results << [info, Result::FAILED] else sockets << info.sock new_list << info end end @ack_waitings = new_list end readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval) if readable_sockets readable_sockets.each do |sock| results << read_ack_from_sock(sock) end end results.each do |info, ret| if info.nil? yield nil, nil, nil, ret else yield info.chunk_id, info.node, info.sock, ret end end rescue => e @log.error 'unexpected error while receiving ack', error: e @log.error_backtrace end end |
#create_ack(chunk_id, node) ⇒ Object
94 95 96 |
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 94 def create_ack(chunk_id, node) Ack.new(chunk_id, node, self) end |
#enqueue(node, sock, cid) ⇒ Object
98 99 100 101 102 103 |
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 98 def enqueue(node, sock, cid) info = ACKWaitingSockInfo.new(sock, cid, Base64.encode64(cid), node, Fluent::Clock.now + @timeout) @mutex.synchronize do @ack_waitings << info end end |