Class: Funktor::Web::Application

Inherits:
Sinatra::Base
  • Object
show all
Includes:
ShardUtils
Defined in:
lib/funktor/web/application.rb

Instance Method Summary collapse

Methods included from ShardUtils

#calculate_shard

Instance Method Details

#delete_jobs(job_ids, source) ⇒ Object



105
106
107
108
109
110
# File 'lib/funktor/web/application.rb', line 105

def delete_jobs(job_ids, source)
  @tracker = Funktor::ActivityTracker.new
  job_ids.each do |job_id|
    delete_single_job(job_id, source)
  end
end

#delete_single_job(job_id, source) ⇒ Object



112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# File 'lib/funktor/web/application.rb', line 112

def delete_single_job(job_id, source)
  response = dynamodb_client.delete_item({
    key: {
      "jobShard" => calculate_shard(job_id),
      "jobId" => job_id
    },
    table_name: ENV['FUNKTOR_JOBS_TABLE'],
    return_values: "ALL_OLD"
  })
  if response.attributes # this means the record was still there
    if source == "scheduled"
      @tracker.track(:scheduledJobDeleted, nil)
    elsif source == "retry"
      @tracker.track(:retryDeleted, nil)
    end
  end
end

#dynamodb_clientObject



130
131
132
# File 'lib/funktor/web/application.rb', line 130

def dynamodb_client
  Funktor.dynamodb_client
end

#get_activity_dataObject



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/funktor/web/application.rb', line 80

def get_activity_data
  query_params = {
    expression_attribute_values: {
      ":jobShard" => "stat"
    },
    key_condition_expression: "jobShard = :jobShard",
    projection_expression: "jobId, stat_value",
    table_name: ENV['FUNKTOR_JOBS_TABLE']
  }
  resp = dynamodb_client.query(query_params)
  @activity_stats = {}
  resp.items.each do |item|
    @activity_stats[item["jobId"]] = item["stat_value"].to_i
  end
  return @activity_stats
end

#get_jobs(category) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/funktor/web/application.rb', line 63

def get_jobs(category)
  "Jobs of type #{category}"
  query_params = {
    expression_attribute_values: {
      ":category" => category
    },
    key_condition_expression: "category = :category",
    projection_expression: "payload, performAt, jobId, jobShard",
    table_name: ENV['FUNKTOR_JOBS_TABLE'],
    index_name: "categoryIndex"
  }
  resp = dynamodb_client.query(query_params)
  @items = resp.items
  @jobs = @items.map{ |item| Funktor::Job.new(item["payload"]) }
  return @jobs
end

#queue_jobs(job_ids, source) ⇒ Object



97
98
99
100
101
102
103
# File 'lib/funktor/web/application.rb', line 97

def queue_jobs(job_ids, source)
  job_activator = Funktor::JobActivator.new
  job_ids.each do |job_id|
    job_shard = calculate_shard(job_id)
    job_activator.activate_job(job_shard, job_id, source, true)
  end
end