Class: Fluent::Plugin::ForwardOutput::AckHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_forward/ack_handler.rb

Defined Under Namespace

Modules: Result Classes: ACKWaitingSockInfo, Ack

Instance Method Summary collapse

Constructor Details

#initialize(timeout:, log:, read_length:) ⇒ AckHandler

Returns a new instance of AckHandler.



31
32
33
34
35
36
37
38
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 31

def initialize(timeout:, log:, read_length:)
  @mutex = Mutex.new
  @ack_waitings = []
  @timeout = timeout
  @log = log
  @read_length = read_length
  @unpacker = Fluent::MessagePackFactory.msgpack_unpacker
end

Instance Method Details

#collect_response(select_interval) ⇒ Object



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 40

def collect_response(select_interval)
  now = Fluent::Clock.now
  sockets = []
  results = []
  begin
    new_list = []
    @mutex.synchronize do
      @ack_waitings.each do |info|
        if info.expired?(now)
          # There are 2 types of cases when no response has been received from socket:
          # (1) the node does not support sending responses
          # (2) the node does support sending response but responses have not arrived for some reasons.
          @log.warn 'no response from node. regard it as unavailable.', host: info.node.host, port: info.node.port
          results << [info, Result::FAILED]
        else
          sockets << info.sock
          new_list << info
        end
      end
      @ack_waitings = new_list
    end

    begin
      readable_sockets, _, _ = IO.select(sockets, nil, nil, select_interval)
    rescue IOError
      @log.info "connection closed while waiting for readable sockets"
      readable_sockets = nil
    end

    if readable_sockets
      readable_sockets.each do |sock|
        results << read_ack_from_sock(sock)
      end
    end

    results.each do |info, ret|
      if info.nil?
        yield nil, nil, nil, ret
      else
        yield info.chunk_id, info.node, info.sock, ret
      end
    end
  rescue => e
    @log.error 'unexpected error while receiving ack', error: e
    @log.error_backtrace
  end
end

#create_ack(chunk_id, node) ⇒ Object



100
101
102
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 100

def create_ack(chunk_id, node)
  Ack.new(chunk_id, node, self)
end

#enqueue(node, sock, cid) ⇒ Object



104
105
106
107
108
109
# File 'lib/fluent/plugin/out_forward/ack_handler.rb', line 104

def enqueue(node, sock, cid)
  info = ACKWaitingSockInfo.new(sock, cid, Base64.encode64(cid), node, Fluent::Clock.now + @timeout)
  @mutex.synchronize do
    @ack_waitings << info
  end
end