Class: Fluent::Plugin::ForwardOutput::SocketCache
- Inherits:
-
Object
- Object
- Fluent::Plugin::ForwardOutput::SocketCache
- Defined in:
- lib/fluent/plugin/out_forward/socket_cache.rb
Defined Under Namespace
Classes: TimedSocket
Instance Method Summary collapse
- #checkin(sock) ⇒ Object
- #checkout_or(key) ⇒ Object
- #clear ⇒ Object
-
#initialize(timeout, log) ⇒ SocketCache
constructor
A new instance of SocketCache.
- #purge_obsolete_socks ⇒ Object
- #revoke(sock) ⇒ Object
Constructor Details
#initialize(timeout, log) ⇒ SocketCache
Returns a new instance of SocketCache.
24 25 26 27 28 29 30 31 |
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 24 def initialize(timeout, log) @log = log @timeout = timeout @available_sockets = Hash.new { |obj, k| obj[k] = [] } @inflight_sockets = {} @inactive_sockets = [] @mutex = Mutex.new end |
Instance Method Details
#checkin(sock) ⇒ Object
50 51 52 53 54 55 56 57 58 59 |
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 50 def checkin(sock) @mutex.synchronize do if (s = @inflight_sockets.delete(sock)) s.timeout = timeout @available_sockets[s.key] << s else @log.debug("there is no socket #{sock}") end end end |
#checkout_or(key) ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 33 def checkout_or(key) @mutex.synchronize do tsock = pick_socket(key) if tsock tsock.sock else sock = yield new_tsock = TimedSocket.new(timeout, key, sock) @log.debug("connect new socket #{new_tsock}") @inflight_sockets[sock] = new_tsock new_tsock.sock end end end |
#clear ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 98 def clear sockets = [] @mutex.synchronize do sockets += @available_sockets.values.flat_map { |v| v } sockets += @inflight_sockets.values sockets += @inactive_sockets @available_sockets.clear @inflight_sockets.clear @inactive_sockets.clear end sockets.each do |s| s.sock.close rescue nil end end |
#purge_obsolete_socks ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 71 def purge_obsolete_socks sockets = [] @mutex.synchronize do # don't touch @inflight_sockets @available_sockets.each do |_, socks| socks.each do |sock| if expired_socket?(sock) sockets << sock socks.delete(sock) end end end # reuse same object (@available_sockets) @available_sockets.reject! { |_, v| v.empty? } sockets += @inactive_sockets @inactive_sockets.clear end sockets.each do |s| s.sock.close rescue nil end end |
#revoke(sock) ⇒ Object
61 62 63 64 65 66 67 68 69 |
# File 'lib/fluent/plugin/out_forward/socket_cache.rb', line 61 def revoke(sock) @mutex.synchronize do if (s = @inflight_sockets.delete(sock)) @inactive_sockets << s else @log.debug("there is no socket #{sock}") end end end |