Class: FFWD::TCP::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/ffwd/protocol/tcp/connection.rb

Constant Summary collapse

INITIAL_TIMEOUT =
2
DEFAULT_FLUSH_PERIOD =

default flush period, if non-zero will cause the connection to be buffered.

10
DEFAULT_TCP_OUTBOUND_LIMIT =

default amount of bytes that the outbound connection will allow in its application-level buffer.

2 ** 20

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(log, host, port, handler, config) ⇒ Connection

Returns a new instance of Connection.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/ffwd/protocol/tcp/connection.rb', line 37

def initialize log, host, port, handler, config
  @log = log
  @host = host
  @port = port
  @handler = handler
  @config = config

  @tcp_outbound_limit = config[:tcp_outbound_limit]

  @peer = "#{host}:#{port}"
  @closing = false
  @reconnect_timeout = INITIAL_TIMEOUT
  @reporter_meta = {:component => @handler.plugin_type, :peer => peer}

  @timer = nil
  @c = nil
  @open = false
end

Instance Attribute Details

#logObject (readonly)

Returns the value of attribute log.



28
29
30
# File 'lib/ffwd/protocol/tcp/connection.rb', line 28

def log
  @log
end

#peerObject (readonly)

Returns the value of attribute peer.



28
29
30
# File 'lib/ffwd/protocol/tcp/connection.rb', line 28

def peer
  @peer
end

#reporter_metaObject (readonly)

Returns the value of attribute reporter_meta.



28
29
30
# File 'lib/ffwd/protocol/tcp/connection.rb', line 28

def reporter_meta
  @reporter_meta
end

Class Method Details

.prepare(opts) ⇒ Object



30
31
32
33
34
35
# File 'lib/ffwd/protocol/tcp/connection.rb', line 30

def self.prepare opts
  opts[:flush_period] ||= DEFAULT_FLUSH_PERIOD
  opts[:tcp_outbound_limit] ||= DEFAULT_TCP_OUTBOUND_LIMIT
  opts[:ignored] = (opts[:ignored] || []).map{|v| Utils.check_ignored v}
  opts
end

Instance Method Details

#connectObject

Start attempting to connect.



57
58
59
60
61
# File 'lib/ffwd/protocol/tcp/connection.rb', line 57

def connect
  @c = EM.connect @host, @port, @handler, self, @config
  log.info "Connect to tcp://#{@host}:#{@port}"
  log.info "  config: #{@config.inspect}"
end

#connection_completedObject



86
87
88
89
90
91
92
93
94
95
# File 'lib/ffwd/protocol/tcp/connection.rb', line 86

def connection_completed
  @open = true
  @log.info "Connected tcp://#{peer}"
  @reconnect_timeout = INITIAL_TIMEOUT

  unless @timer.nil?
    @timer.cancel
    @timer = nil
  end
end

#disconnectObject

Explicitly disconnect and discard any reconnect attempts..



64
65
66
67
68
69
70
71
72
# File 'lib/ffwd/protocol/tcp/connection.rb', line 64

def disconnect
  log.info "Disconnecting from tcp://#{@host}:#{@port}"
  @closing = true

  @c.close_connection if @c
  @timer.cancel if @timer
  @c = nil
  @timer = nil
end

#send_all(events, metrics) ⇒ Object



82
83
84
# File 'lib/ffwd/protocol/tcp/connection.rb', line 82

def send_all events, metrics
  @c.send_all events, metrics
end

#send_event(event) ⇒ Object



74
75
76
# File 'lib/ffwd/protocol/tcp/connection.rb', line 74

def send_event event
  @c.send_event event
end

#send_metric(metric) ⇒ Object



78
79
80
# File 'lib/ffwd/protocol/tcp/connection.rb', line 78

def send_metric metric
  @c.send_metric metric
end

#unbindObject



97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/ffwd/protocol/tcp/connection.rb', line 97

def unbind
  @open = false
  @c = nil

  if @closing
    return
  end

  @log.info "Disconnected from tcp://#{peer}, reconnecting in #{@reconnect_timeout}s"

  unless @timer.nil?
    @timer.cancel
    @timer = nil
  end

  @timer = EM::Timer.new(@reconnect_timeout) do
    @reconnect_timeout *= 2
    @timer = nil
    @c = EM.connect @host, @port, @handler, self, *@args
  end
end

#writable?Boolean

Check if a connection is writable or not.

Returns:

  • (Boolean)


120
121
122
# File 'lib/ffwd/protocol/tcp/connection.rb', line 120

def writable?
  not @c.nil? and @open and @c.get_outbound_data_size < @tcp_outbound_limit
end