Class: FFWD::TCP::FlushingConnect
- Inherits:
-
Object
- Object
- FFWD::TCP::FlushingConnect
- Includes:
- Reporter
- Defined in:
- lib/ffwd/protocol/tcp/flushing_connect.rb
Overview
A TCP connection implementation that buffers events and metrics in batches over a time window and calls ‘send_all’ on the connection.
Constant Summary collapse
- DEFAULT_FORCED_FLUSH_FACTOR =
percent of maximum events/metrics which will cause a flush.
0.8
- DEFAULT_EVENT_LIMIT =
defaults for buffered connections. maximum amount of events to buffer up.
1000
- DEFAULT_METRIC_LIMIT =
maximum amount of metrics to buffer up.
10000
Instance Attribute Summary collapse
-
#log ⇒ Object
readonly
Returns the value of attribute log.
Class Method Summary collapse
Instance Method Summary collapse
- #flush! ⇒ Object
-
#initialize(core, log, connection, config) ⇒ FlushingConnect
constructor
A new instance of FlushingConnect.
- #reporter_meta ⇒ Object
Methods included from Reporter
build_meta, included, #increment, map_meta, #report!, #reporter_data
Constructor Details
#initialize(core, log, connection, config) ⇒ FlushingConnect
Returns a new instance of FlushingConnect.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 58 def initialize(core, log, connection, config) @log = log @c = connection flush_period = config[:flush_period] ignored = config[:ignored] forced_flush_factor = config[:forced_flush_factor] event_limit = config[:event_limit] metric_limit = config[:metric_limit] @event_buffer = [] @metric_buffer = [] @timer = nil @subs = [] core.starting do @c.connect @timer = EM::PeriodicTimer.new(flush_period){flush!} unless ignored.include? :events event_consumer = setup_consumer( @event_buffer, event_limit, forced_flush_factor, :dropped_events) @subs << core.output.event_subscribe(&event_consumer) end unless ignored.include? :metrics metric_consumer = setup_consumer( @metric_buffer, metric_limit, forced_flush_factor, :dropped_metrics) @subs << core.output.metric_subscribe(&metric_consumer) end end core.stopping do @c.disconnect if @timer @timer.cancel @timer = nil end @subs.each(&:unsubscribe).clear end end |
Instance Attribute Details
#log ⇒ Object (readonly)
Returns the value of attribute log.
45 46 47 |
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 45 def log @log end |
Class Method Details
.prepare(opts) ⇒ Object
51 52 53 54 55 56 |
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 51 def self.prepare opts opts[:forced_flush_factor] ||= DEFAULT_FORCED_FLUSH_FACTOR opts[:event_limit] ||= DEFAULT_EVENT_LIMIT opts[:metric_limit] ||= DEFAULT_METRIC_LIMIT opts end |
Instance Method Details
#flush! ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 103 def flush! if @event_buffer.empty? and @metric_buffer.empty? return end unless @c.writable? increment :dropped_events, @event_buffer.size increment :dropped_metrics, @metric_buffer.size return end @c.send_all @event_buffer, @metric_buffer increment :sent_events, @event_buffer.size increment :sent_metrics, @metric_buffer.size rescue => e log.error "Failed to flush buffers", e log.error "The following data could not be flushed:" @event_buffer.each_with_index do |event, i| log.error "##{i}: #{event.to_h}" end @metric_buffer.each_with_index do |metric, i| log.error "##{i}: #{metric.to_h}" end increment :failed_events, @event_buffer.size increment :failed_metrics, @metric_buffer.size ensure @event_buffer.clear @metric_buffer.clear end |
#reporter_meta ⇒ Object
47 48 49 |
# File 'lib/ffwd/protocol/tcp/flushing_connect.rb', line 47 def @c. end |