Class: FluidFeatures::AppReporter

Inherits:
Object
  • Object
show all
Defined in:
lib/fluidfeatures/app/reporter.rb

Constant Summary collapse

MAX_BUCKETS =

Throw oldest buckets away or offload to persistent storage when this limit reached.

10
MAX_BUCKET_SIZE =

Max number of transactions we queue in a bucket.

100
WAIT_BETWEEN_QUEUE_EMTPY_CHECKS =

While queue is empty we will check size every 0.5 secs

0.5
WAIT_BETWEEN_SEND_SUCCESS_NONE_WAITING =

Soft max of 1 req/sec

1
WAIT_BETWEEN_SEND_SUCCESS_NEXT_WAITING =

Hard max of 10 req/sec

0.1
WAIT_BETWEEN_SEND_FAILURES =

If we are failing to communicate with the FluidFeautres API then wait for this long between requests.

5

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(app) ⇒ AppReporter

seconds



28
29
30
31
32
33
34
35
# File 'lib/fluidfeatures/app/reporter.rb', line 28

def initialize(app)
  raise "app invalid : #{app}" unless app.is_a? ::FluidFeatures::App
  configure(app)
  run_loop
  at_exit do
    buckets_storage.append(@buckets)
  end
end

Instance Attribute Details

#appObject

Returns the value of attribute app.



7
8
9
# File 'lib/fluidfeatures/app/reporter.rb', line 7

def app
  @app
end

Instance Method Details

#bucket_countObject



206
207
208
209
210
211
212
# File 'lib/fluidfeatures/app/reporter.rb', line 206

def bucket_count
  num_buckets = 0
  @buckets_lock.synchronize do
    num_buckets = @buckets.size
  end
  num_buckets
end

#buckets_storageObject



37
38
39
# File 'lib/fluidfeatures/app/reporter.rb', line 37

def buckets_storage
  @buckets_storage ||= FluidFeatures::Persistence::Buckets.create(FluidFeatures.config["cache"])
end

#configure(app) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/fluidfeatures/app/reporter.rb', line 45

def configure(app)
  @app = app

  @buckets = buckets_storage.fetch(MAX_BUCKETS)

  @buckets_lock = ::Mutex.new

  #maybe could get rid of @current_bucket concept
  @current_bucket = nil
  @current_bucket_lock = ::Mutex.new
  @current_bucket = last_or_new_bucket

  @unknown_features = features_storage.list_unknown
  @unknown_features_lock = ::Mutex.new
end

#features_storageObject



41
42
43
# File 'lib/fluidfeatures/app/reporter.rb', line 41

def features_storage
  @features_storage ||= FluidFeatures::Persistence::Features.create(FluidFeatures.config["cache"])
end

#last_or_new_bucketObject



61
62
63
# File 'lib/fluidfeatures/app/reporter.rb', line 61

def last_or_new_bucket
  @buckets.empty? || @buckets.last.size >= MAX_BUCKET_SIZE ? new_bucket : @buckets.last
end

#new_bucketObject



215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/fluidfeatures/app/reporter.rb', line 215

def new_bucket
  bucket = []
  @buckets_lock.synchronize do
    @buckets << bucket
    if @buckets.size > MAX_BUCKETS
      #offload to storage
      unless buckets_storage.append_one(@buckets.shift)
        app.logger.warn "[FF] Discarded transactions due to reporter backlog. These will not be reported to FluidFeatures."
      end
    end
  end
  bucket
end

#queue_transaction_payload(transaction_payload) ⇒ Object



266
267
268
269
270
271
272
273
# File 'lib/fluidfeatures/app/reporter.rb', line 266

def queue_transaction_payload(transaction_payload)
  @current_bucket_lock.synchronize do
    if @current_bucket.size >= MAX_BUCKET_SIZE
      @current_bucket = new_bucket
    end
    @current_bucket << transaction_payload
  end
end

#queue_unknown_features(unknown_features) ⇒ Object



276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/fluidfeatures/app/reporter.rb', line 276

def queue_unknown_features(unknown_features)
  raise "unknown_features should be a Hash" unless unknown_features.is_a? Hash
  unknown_features.each_pair do |feature_name, versions|
    raise "unknown_features values should be a Hash. versions=#{versions}" unless versions.is_a? Hash
  end
  @unknown_features_lock.synchronize do
    unknown_features.each_pair do |feature_name, versions|
      unless @unknown_features.has_key? feature_name
        @unknown_features[feature_name] = {}
      end
      versions.each_pair do |version_name, default_enabled|
        unless @unknown_features[feature_name].has_key? version_name
          @unknown_features[feature_name][version_name] = default_enabled
        end
      end
    end
  end
end

#remove_bucketObject



230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
# File 'lib/fluidfeatures/app/reporter.rb', line 230

def remove_bucket
  removed_bucket = nil
  @buckets_lock.synchronize do
    #try to get buckets from storage first
    if @buckets.empty? && !buckets_storage.empty?
      @buckets = buckets_storage.fetch(MAX_BUCKETS)
    end

    if @buckets.size > 0
      removed_bucket = @buckets.shift
    end
    if @buckets.size == 0
      @current_bucket_lock.synchronize do
        @current_bucket = []
        @buckets << @current_bucket
      end
    end
  end
  removed_bucket
end

#report_transaction(transaction) ⇒ Object

Pass FluidFeatures::AppUserTransaction for reporting back to the FluidFeatures service.



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/fluidfeatures/app/reporter.rb', line 67

def report_transaction(transaction)

  user = transaction.user

  payload = {
    :url => transaction.url,
    :user => {
      :id => user.unique_id
    },
    :hits => {
      :feature => transaction.features_hit,
      :goal    => transaction.goals_hit
    },
    # stats
    :stats => {
      :duration => transaction.duration
    }
  }

  payload_user = payload[:user] ||= {}
  payload_user[:name] = user.display_name if user.display_name
  payload_user[:anonymous] = user.anonymous if user.anonymous
  payload_user[:unique] = user.unique_attrs if user.unique_attrs
  payload_user[:cohorts] = user.cohort_attrs if user.cohort_attrs

  queue_transaction_payload(payload)

  if transaction.unknown_features.size > 0
    queue_unknown_features(transaction.unknown_features)
    features_storage.replace_unknown(@unknown_features)
  end

end

#run_loopObject



101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/fluidfeatures/app/reporter.rb', line 101

def run_loop
  Thread.new do
    while true
      begin

        unless transactions_queued?
          sleep WAIT_BETWEEN_QUEUE_EMTPY_CHECKS
          next
        end

        success = send_transactions

        if success
          # Unless we have a full bucket waiting do not make
          # more than N requests per second.
          if bucket_count <= 1
            sleep WAIT_BETWEEN_SEND_SUCCESS_NONE_WAITING
          else
            sleep WAIT_BETWEEN_SEND_SUCCESS_NEXT_WAITING
          end
        else  
          # If service is down, then slow our requests
          # within this thread
          sleep WAIT_BETWEEN_SEND_FAILURES
        end

      rescue Exception => err
        # catch errors, so that we do not affect the rest of the application
        app.logger.error "[FF] send_transactions failed : #{err.message}\n#{err.backtrace.join("\n")}"
        # hold off for a little while and try again
        sleep WAIT_BETWEEN_SEND_FAILURES
      end
    end
  end
end

#send_transactionsObject



155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# File 'lib/fluidfeatures/app/reporter.rb', line 155

def send_transactions
  bucket = remove_bucket

  # Take existing unknown features and reset
  unknown_features = nil
  @unknown_features_lock.synchronize do
    unknown_features = @unknown_features
    @unknown_features = {}
  end

  remaining_buckets_stats = nil
  @buckets_lock.synchronize do
    remaining_buckets_stats = @buckets.map { |b| b.size }
  end

  api_request_log = app.client.siphon_api_request_log

  payload = {
    :client_uuid => app.client.uuid,
    :transactions => bucket,
    :stats => {
      :waiting_buckets => remaining_buckets_stats
    },
    :unknown_features => unknown_features,
    :api_request_log => api_request_log
  }

  if remaining_buckets_stats.size > 0
    payload[:stats][:waiting_buckets] = remaining_buckets_stats
  end

  # attempt to send to fluidfeatures service
  success = app.post("/report/transactions", payload)

  # handle failure to send data
  unless success
    # return bucket into bucket queue until the next attempt at sending
    if not unremove_bucket(bucket)
      app.logger.warn "[FF] Discarded transactions due to reporter backlog. These will not be reported to FluidFeatures."
    end
    # return unknown features to queue until the next attempt at sending
    queue_unknown_features(unknown_features)
  else
    features_storage.replace_unknown({})
  end

  # return whether we were able to send or not
  success
end

#transactions_queued?Boolean

Returns:

  • (Boolean)


138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/fluidfeatures/app/reporter.rb', line 138

def transactions_queued?
  have_transactions = false
  @buckets_lock.synchronize do
    if @buckets.size == 1
      @current_bucket_lock.synchronize do
        if @current_bucket.size > 0
          have_transactions = true
        end
      end
    elsif @buckets.size > 1 and @buckets[0].size > 0
      have_transactions = true
    end
  end
  have_transactions
end

#unremove_bucket(bucket) ⇒ Object



252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/fluidfeatures/app/reporter.rb', line 252

def unremove_bucket(bucket)
  success = false
  @buckets_lock.synchronize do
    if @buckets.size <= MAX_BUCKETS
      @buckets.unshift bucket
      success = true
    else
      success = buckets_storage.append_one(bucket)
    end
  end
  success
end