Class: Funktor::ActivityTracker

Inherits:
Object
  • Object
show all
Defined in:
lib/funktor/activity_tracker.rb

Constant Summary collapse

INCR_KEYS =
{
  incoming: 'incoming',
  queued: 'queued',
  scheduled: 'scheduled',
  processingStarted: 'processing',
  processingComplete: 'complete',
  processingFailed: 'failed',
  bailingOut: 'failed',
  retrying: 'retries',
  retryActivated: 'queued',
  scheduledJobActivated: nil,
  scheduledJobDeleted: 'scheduledJobDeleted',
  retryDeleted: 'retryDeleted'
  #scheduledJobPushedToActive: 'active',
  #activeJobPushed: 'active',
  #scheduledJobPushed: 'scheduled'
}
DECR_KEYS =
{
  incoming: nil,
  queued: nil,
  scheduled: nil,
  processingStarted: 'queued',
  processingComplete: 'processing',
  processingFailed: nil,
  bailingOut: 'processing',
  retrying: 'processing',
  retryActivated: 'retries',
  scheduledJobActivated: 'scheduled',
  scheduledJobDeleted: 'scheduled',
  retryDeleted: 'retries'
  #scheduledJobPushedToActive: 'scheduled',
  #activeJobPushed: nil,
  #scheduledJobPushed: nil
}

Instance Method Summary collapse

Instance Method Details

#decrement_key(key) ⇒ Object



69
70
71
72
73
74
75
76
77
# File 'lib/funktor/activity_tracker.rb', line 69

def decrement_key(key)
  #put_metric_to_stdout(key, -1)
  dynamodb_client.update_item({
    table_name: ENV['FUNKTOR_JOBS_TABLE'],
    key: { jobShard: 'stat', jobId: key },
    expression_attribute_values:{ ":start": 0, ":inc": 1 },
    update_expression: "SET stat_value = if_not_exists(stat_value, :start) - :inc",
  })
end

#dynamodb_clientObject



79
80
81
# File 'lib/funktor/activity_tracker.rb', line 79

def dynamodb_client
  Funktor.dynamodb_client
end

#increment_key(key) ⇒ Object



59
60
61
62
63
64
65
66
67
# File 'lib/funktor/activity_tracker.rb', line 59

def increment_key(key)
  #put_metric_to_stdout(key, 1)
  dynamodb_client.update_item({
    table_name: ENV['FUNKTOR_JOBS_TABLE'],
    key: { jobShard: 'stat', jobId: key },
    expression_attribute_values:{ ":start": 0, ":inc": 1 },
    update_expression: "SET stat_value = if_not_exists(stat_value, :start) + :inc",
  })
end

#metric_namespaceObject



83
84
85
# File 'lib/funktor/activity_tracker.rb', line 83

def metric_namespace
  [ENV['FUNKTOR_APP_NAME'], ENV['SERVERLESS_STAGE']].join('-')
end

#put_metric_to_stdout(key, value) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/funktor/activity_tracker.rb', line 87

def put_metric_to_stdout(key, value)
  data = {
    "_aws": {
      "Timestamp": Time.now.strftime('%s%3N').to_i,
      "CloudWatchMetrics": [
        {
          "Namespace": metric_namespace,
          "Dimensions": [["functionVersion"]],
          "Metrics": [ # CPU, Memory, Duration, etc...
                       {
                         "Name": key,
                         "Unit": "Count"
                       }
          ]
        }
      ]
    },
    "functionVersion": "LATEST",
    #"count": value,
    #"requestId": "989ffbf8-9ace-4817-a57c-e4dd734019ee"
  }
  data[key] = value
  puts(data.to_json)
end

#track(activity, job) ⇒ Object



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/funktor/activity_tracker.rb', line 42

def track(activity, job)
  unless Funktor.enable_activity_tracking
    Funktor.logger.debug "activity tracking is disabled"
    return
  end
  Funktor.logger.debug "starting track activity for #{activity}"
  incrKey = nil || INCR_KEYS[activity.to_sym]
  decrKey = nil || DECR_KEYS[activity.to_sym]
  if incrKey
    increment_key(incrKey)
  end
  if decrKey
    decrement_key(decrKey)
  end
  Funktor.logger.debug "ending track activity for #{activity}"
end