Class: FFWD::TCP::Connection
- Inherits:
-
Object
- Object
- FFWD::TCP::Connection
- Defined in:
- lib/ffwd/protocol/tcp/connection.rb
Constant Summary collapse
- INITIAL_TIMEOUT =
2
- DEFAULT_FLUSH_PERIOD =
default flush period, if non-zero will cause the connection to be buffered.
10
- DEFAULT_TCP_OUTBOUND_LIMIT =
default amount of bytes that the outbound connection will allow in its application-level buffer.
2 ** 20
Instance Attribute Summary collapse
-
#log ⇒ Object
readonly
Returns the value of attribute log.
-
#peer ⇒ Object
readonly
Returns the value of attribute peer.
-
#reporter_meta ⇒ Object
readonly
Returns the value of attribute reporter_meta.
Class Method Summary collapse
Instance Method Summary collapse
-
#connect ⇒ Object
Start attempting to connect.
- #connection_completed ⇒ Object
-
#disconnect ⇒ Object
Explicitly disconnect and discard any reconnect attempts..
-
#initialize(log, host, port, handler, config) ⇒ Connection
constructor
A new instance of Connection.
- #send_all(events, metrics) ⇒ Object
- #send_event(event) ⇒ Object
- #send_metric(metric) ⇒ Object
- #unbind ⇒ Object
-
#writable? ⇒ Boolean
Check if a connection is writable or not.
Constructor Details
#initialize(log, host, port, handler, config) ⇒ Connection
Returns a new instance of Connection.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 37 def initialize log, host, port, handler, config @log = log @host = host @port = port @handler = handler @config = config @tcp_outbound_limit = config[:tcp_outbound_limit] @peer = "#{host}:#{port}" @closing = false @reconnect_timeout = INITIAL_TIMEOUT @reporter_meta = {:component => @handler.plugin_type, :peer => peer} @timer = nil @c = nil @open = false end |
Instance Attribute Details
#log ⇒ Object (readonly)
Returns the value of attribute log.
28 29 30 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 28 def log @log end |
#peer ⇒ Object (readonly)
Returns the value of attribute peer.
28 29 30 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 28 def peer @peer end |
#reporter_meta ⇒ Object (readonly)
Returns the value of attribute reporter_meta.
28 29 30 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 28 def @reporter_meta end |
Class Method Details
.prepare(opts) ⇒ Object
30 31 32 33 34 35 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 30 def self.prepare opts opts[:flush_period] ||= DEFAULT_FLUSH_PERIOD opts[:tcp_outbound_limit] ||= DEFAULT_TCP_OUTBOUND_LIMIT opts[:ignored] = (opts[:ignored] || []).map{|v| Utils.check_ignored v} opts end |
Instance Method Details
#connect ⇒ Object
Start attempting to connect.
57 58 59 60 61 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 57 def connect @c = EM.connect @host, @port, @handler, self, @config log.info "Connect to tcp://#{@host}:#{@port}" log.info " config: #{@config.inspect}" end |
#connection_completed ⇒ Object
86 87 88 89 90 91 92 93 94 95 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 86 def connection_completed @open = true @log.info "Connected tcp://#{peer}" @reconnect_timeout = INITIAL_TIMEOUT unless @timer.nil? @timer.cancel @timer = nil end end |
#disconnect ⇒ Object
Explicitly disconnect and discard any reconnect attempts..
64 65 66 67 68 69 70 71 72 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 64 def disconnect log.info "Disconnecting from tcp://#{@host}:#{@port}" @closing = true @c.close_connection if @c @timer.cancel if @timer @c = nil @timer = nil end |
#send_all(events, metrics) ⇒ Object
82 83 84 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 82 def send_all events, metrics @c.send_all events, metrics end |
#send_event(event) ⇒ Object
74 75 76 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 74 def send_event event @c.send_event event end |
#send_metric(metric) ⇒ Object
78 79 80 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 78 def send_metric metric @c.send_metric metric end |
#unbind ⇒ Object
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 97 def unbind @open = false @c = nil if @closing return end @log.info "Disconnected from tcp://#{peer}, reconnecting in #{@reconnect_timeout}s" unless @timer.nil? @timer.cancel @timer = nil end @timer = EM::Timer.new(@reconnect_timeout) do @reconnect_timeout *= 2 @timer = nil @c = EM.connect @host, @port, @handler, self, *@args end end |
#writable? ⇒ Boolean
Check if a connection is writable or not.
120 121 122 |
# File 'lib/ffwd/protocol/tcp/connection.rb', line 120 def writable? not @c.nil? and @open and @c.get_outbound_data_size < @tcp_outbound_limit end |