Module: ThreeScale::Backend::Transactor::NotifyBatcher

Includes:
Resque::Helpers, Configurable
Included in:
ThreeScale::Backend::Transactor
Defined in:
lib/3scale/backend/transactor/notify_batcher.rb

Overview

This module is responsible for scheduling Notify jobs. These jobs are used to report the usage of some metrics specified in the master account.

Instance Method Summary collapse

Methods included from Configurable

#configuration, #configuration=, included

Instance Method Details

#do_batch(list) ⇒ Object



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/3scale/backend/transactor/notify_batcher.rb', line 73

def do_batch(list)
  all = Hash.new

  list.each do |item|
    obj = decode(item)

    provider_key = obj['provider_key'.freeze]
    time = obj['time'.freeze]
    usage = obj['usage'.freeze]

    if usage.nil?
      obj['usage'.freeze] = {}
    end

    bucket_key = "#{provider_key}-" << time
    bucket_obj = all[bucket_key]

    if bucket_obj.nil?
      all[bucket_key] = obj
    else
      bucket_usage = bucket_obj['usage'.freeze]

      usage.each do |metric_name, value|
        bucket_usage[metric_name] =
          bucket_usage.fetch(metric_name, 0) + value.to_i
      end
    end
  end

  enqueue_ts = Time.now.utc.to_f

  all.each do |_, v|
    enqueue_notify_job(v['provider_key'.freeze],
                       v['usage'.freeze],
                       v['time'.freeze],
                       enqueue_ts)
  end
end

#get_batch(num_elements) ⇒ Object



62
63
64
65
66
67
# File 'lib/3scale/backend/transactor/notify_batcher.rb', line 62

def get_batch(num_elements)
  storage.pipelined do
    storage.lrange(key_for_notifications_batch, 0, num_elements - 1)
    storage.ltrim(key_for_notifications_batch, num_elements, -1)
  end.first
end

#key_for_notifications_batchObject



28
29
30
# File 'lib/3scale/backend/transactor/notify_batcher.rb', line 28

def key_for_notifications_batch
  "notify/aggregator/batch"
end

#notify(provider_key, usage) ⇒ Object



32
33
34
35
36
37
38
39
40
# File 'lib/3scale/backend/transactor/notify_batcher.rb', line 32

def notify(provider_key, usage)
  # We need the master service ID to report its metrics. If it's not
  # set, we don't need to notify anything.
  # Batch several notifications together so that we can process just one
  # job for a group of them.
  unless configuration.master_service_id.to_s.empty?
    notify_batch(provider_key, usage)
  end
end

#notify_authorize(provider_key) ⇒ Object



15
16
17
# File 'lib/3scale/backend/transactor/notify_batcher.rb', line 15

def notify_authorize(provider_key)
  notify(provider_key, configuration.master.metrics.transactions_authorize => 1)
end

#notify_authrep(provider_key, transactions) ⇒ Object



19
20
21
22
# File 'lib/3scale/backend/transactor/notify_batcher.rb', line 19

def notify_authrep(provider_key, transactions)
  notify(provider_key, configuration.master.metrics.transactions_authorize => 1,
         configuration.master.metrics.transactions => transactions)
end

#notify_batch(provider_key, usage) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/3scale/backend/transactor/notify_batcher.rb', line 42

def notify_batch(provider_key, usage)
  # discard seconds so that all the notifications are stored in the same
  # bucket, because aggregation is done at the minute level.
  tt = Time.now.getutc
  tt = tt - tt.sec

  encoded = Yajl::Encoder.encode({
    provider_key: provider_key,
    usage: usage,
    time: tt.to_s
  })

  num_elements = storage.rpush(key_for_notifications_batch, encoded)

  if (num_elements  % configuration.notification_batch) == 0
    # batch is full
    process_batch(num_elements)
  end
end

#notify_report(provider_key, transactions) ⇒ Object



24
25
26
# File 'lib/3scale/backend/transactor/notify_batcher.rb', line 24

def notify_report(provider_key, transactions)
  notify(provider_key, configuration.master.metrics.transactions => transactions)
end

#process_batch(num_elements) ⇒ Object



69
70
71
# File 'lib/3scale/backend/transactor/notify_batcher.rb', line 69

def process_batch(num_elements)
  do_batch(get_batch num_elements)
end