Class: Fluent::Plugin::ForwardOutput::SocketCache

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/out_forward/socket_cache.rb

Defined Under Namespace

Classes: TimedSocket

Instance Method Summary collapse

Constructor Details

#initialize(timeout, log) ⇒ SocketCache

Returns a new instance of SocketCache.



24
25
26
27
28
29
30
31
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 24

def initialize(timeout, log)
  @log = log
  @timeout = timeout
  @available_sockets = Hash.new { |obj, k| obj[k] = [] }
  @inflight_sockets = {}
  @inactive_sockets = []
  @mutex = Mutex.new
end

Instance Method Details

#checkin(sock) ⇒ Object



50
51
52
53
54
55
56
57
58
59
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 50

def checkin(sock)
  @mutex.synchronize do
    if (s = @inflight_sockets.delete(sock))
      s.timeout = timeout
      @available_sockets[s.key] << s
    else
      @log.debug("there is no socket #{sock}")
    end
  end
end

#checkout_or(key) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 33

def checkout_or(key)
  @mutex.synchronize do
    tsock = pick_socket(key)

    if tsock
      tsock.sock
    else
      sock = yield
      new_tsock = TimedSocket.new(timeout, key, sock)
      @log.debug("connect new socket #{new_tsock}")

      @inflight_sockets[sock] = new_tsock
      new_tsock.sock
    end
  end
end

#clearObject



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 98

def clear
  sockets = []
  @mutex.synchronize do
    sockets += @available_sockets.values.flat_map { |v| v }
    sockets += @inflight_sockets.values
    sockets += @inactive_sockets

    @available_sockets.clear
    @inflight_sockets.clear
    @inactive_sockets.clear
  end

  sockets.each do |s|
    s.sock.close rescue nil
  end
end

#purge_obsolete_socksObject



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 71

def purge_obsolete_socks
  sockets = []

  @mutex.synchronize do
    # don't touch @inflight_sockets

    @available_sockets.each do |_, socks|
      socks.each do |sock|
        if expired_socket?(sock)
          sockets << sock
          socks.delete(sock)
        end
      end
    end

    # reuse same object (@available_sockets)
    @available_sockets.reject! { |_, v| v.empty? }

    sockets += @inactive_sockets
    @inactive_sockets.clear
  end

  sockets.each do |s|
    s.sock.close rescue nil
  end
end

#revoke(sock) ⇒ Object



61
62
63
64
65
66
67
68
69
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 61

def revoke(sock)
  @mutex.synchronize do
    if (s = @inflight_sockets.delete(sock))
      @inactive_sockets << s
    else
      @log.debug("there is no socket #{sock}")
    end
  end
end