Class: Fluent::Plugin::ForwardOutput::AckHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_forward/ack_handler.rb

Defined Under Namespace

Modules: Result Classes: ACKWaitingSockInfo, Ack

Instance Method Summary collapse

Constructor Details

#initialize(timeout:, log:, read_length:) ⇒ AckHandler

Returns a new instance of AckHandler.



31
32
33
34
35
36
37
38
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 31

def initialize(timeout:, log:, read_length:)
  @mutex = Mutex.new
  @ack_waitings = []
  @timeout = timeout
  @log = log
  @read_length = read_length
  @unpacker = Fluent::MessagePackFactory.msgpack_unpacker
end

Instance Method Details

#collect_response(select_interval) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 40

def collect_response(select_interval)
  now = Fluent::Clock.now
  sockets = []
  results = []
  begin
    new_list = []
    @mutex.synchronize do
      @ack_waitings.each do |info|
        if info.expired?(now)
          # There are 2 types of cases when no response has been received from socket:
          # (1) the node does not support sending responses
          # (2) the node does support sending response but responses have not arrived for some reasons.
          @log.warn 'no response from node. regard it as unavailable.', host: info.node.host, port: info.node.port
          results << [info, Result::FAILED]
        else
          sockets << info.sock
          new_list << info
        end
      end
      @ack_waitings = new_list
    end

    readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
    if readable_sockets
      readable_sockets.each do |sock|
        results << read_ack_from_sock(sock)
      end
    end

    results.each do |info, ret|
      if info.nil?
        yield nil, nil, nil, ret
      else
        yield info.chunk_id, info.node, info.sock, ret
      end
    end
  rescue => e
    @log.error 'unexpected error while receiving ack', error: e
    @log.error_backtrace
  end
end

#create_ack(chunk_id, node) ⇒ Object



94
95
96
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 94

def create_ack(chunk_id, node)
  Ack.new(chunk_id, node, self)
end

#enqueue(node, sock, cid) ⇒ Object



98
99
100
101
102
103
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 98

def enqueue(node, sock, cid)
  info = ACKWaitingSockInfo.new(sock, cid, Base64.encode64(cid), node, Fluent::Clock.now + @timeout)
  @mutex.synchronize do
    @ack_waitings << info
  end
end