Class: FFWD::ProducingClient
- Inherits:
-
Object
- Object
- FFWD::ProducingClient
- Defined in:
- lib/ffwd/producing_client.rb
Overview
A client implementation that delegates all work to other threads.
Defined Under Namespace
Classes: Producer
Instance Method Summary collapse
- #flush! ⇒ Object
-
#initialize(channel, producer, flush_period, event_limit, metric_limit) ⇒ ProducingClient
constructor
A new instance of ProducingClient.
- #report!(diff) ⇒ Object
- #reporter_meta ⇒ Object
-
#safer_flush! ⇒ Object
Apply some heuristics to determine if we can ‘ignore’ the current flush to prevent loss of data.
Methods included from Logging
Methods included from Reporter
build_meta, included, #increment, map_meta, #reporter_data
Constructor Details
#initialize(channel, producer, flush_period, event_limit, metric_limit) ⇒ ProducingClient
Returns a new instance of ProducingClient.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/ffwd/producing_client.rb', line 67 def initialize channel, producer, flush_period, event_limit, metric_limit @flush_period = flush_period @event_limit = event_limit @metric_limit = metric_limit if @flush_period <= 0 raise "Invalid flush period: #{flush_period}" end @producer = producer @producer_is_reporter = FFWD.is_reporter? producer @events = [] @metrics = [] # Pending request. @request = nil @timer = nil @subs = [] channel.starting do @timer = EM::PeriodicTimer.new(@flush_period){safer_flush!} @subs << channel.event_subscribe do |e| if @events.size >= @event_limit increment :dropped_events next end @events << e end @subs << channel.metric_subscribe do |m| if @metrics.size >= @metric_limit increment :dropped_metrics next end @metrics << m end @producer.setup end channel.stopping do if @timer @timer.cancel @timer = nil end flush! @subs.each(&:unsubscribe).clear @metrics.clear @events.clear @producer.teardown end end |
Instance Method Details
#flush! ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/ffwd/producing_client.rb', line 148 def flush! if @request or not @request = @producer.produce(@events, @metrics) increment :dropped_events, @events.size increment :dropped_metrics, @metrics.size return end # store buffer sizes for use in callbacks. events_size = @events.size metrics_size = @metrics.size @request.callback do increment :sent_events, events_size increment :sent_metrics, metrics_size @request = nil end @request.errback do |e| log.error "Failed to produce", e increment :failed_events, events_size increment :failed_metrics, metrics_size @request = nil end rescue => e increment :failed_events, @events.size increment :failed_metrics, @metrics.size log.error "Failed to produce", e ensure @events.clear @metrics.clear end |
#report!(diff) ⇒ Object
55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/ffwd/producing_client.rb', line 55 def report!(diff) super(diff) do |m| yield m end return unless @producer_is_reporter @producer.report!(diff) do |m| yield m end end |
#reporter_meta ⇒ Object
50 51 52 53 |
# File 'lib/ffwd/producing_client.rb', line 50 def return {} if @producer_is_reporter @producer.class..merge(@producer.) end |
#safer_flush! ⇒ Object
Apply some heuristics to determine if we can ‘ignore’ the current flush to prevent loss of data.
Checks that if a request is pending; we have not breached the limit of allowed events.
134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/ffwd/producing_client.rb', line 134 def safer_flush! if @request increment :slow_requests ignore_flush = ( @events.size < @event_limit or @metrics.size < @metric_limit) return if ignore_flush end flush! end |