Class: Funktor::JobActivator

Inherits:
Object
  • Object
show all
Defined in:
lib/funktor/job_activator.rb

Instance Method Summary collapse

Constructor Details

#initializeJobActivator

Returns a new instance of JobActivator.



7
8
9
# File 'lib/funktor/job_activator.rb', line 7

def initialize
  @tracker = Funktor::ActivityTracker.new
end

Instance Method Details

#activate_job(job_shard, job_id, current_category, queue_immediately = false) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/funktor/job_activator.rb', line 63

def activate_job(job_shard, job_id, current_category, queue_immediately = false)
  # TODO: WorkQueueVisibilityMiddleware to alter what happens here? Maybe we delete by default and then the middleware puts it back in the table?
  # First we conditionally update the item in  Dynamo to be sure that another scheduler hasn't gotten
  # to it, and if that works then send to SQS. This is basically how Sidekiq scheduler works.
  response = if Funktor.enable_work_queue_visibility
               dynamodb_client.update_item({
                 key: {
                   "jobShard" => job_shard,
                   "jobId" => job_id
                 },
                 update_expression: "SET category = :category, queueable = :queueable",
                 condition_expression: "category = :current_category",
                 expression_attribute_values: {
                   ":current_category" => current_category,
                   ":queueable" => "false",
                   ":category" => "queued"
                 },
                 table_name: delayed_job_table,
                 return_values: "ALL_OLD"
               })
             else
               dynamodb_client.delete_item({
                 key: {
                   "jobShard" => job_shard,
                   "jobId" => job_id
                 },
                 condition_expression: "category = :current_category",
                 expression_attribute_values: {
                   ":current_category" => current_category
                 },
                 table_name: delayed_job_table,
                 return_values: "ALL_OLD"
               })
             end
  if response.attributes # this means the record was still there in the state we expected
    Funktor.logger.debug "response.attributes ====== "
    Funktor.logger.debug response.attributes
    job = Funktor::Job.new(response.attributes["payload"])
    Funktor.logger.debug "we created a job from payload"
    Funktor.logger.debug response.attributes["payload"]
    Funktor.logger.debug "queueing to #{job.retry_queue_url}"
    if queue_immediately
      job.delay = 0
    end
    sqs_client.send_message({
      queue_url: job.retry_queue_url,
      message_body: job.to_json
      #delay_seconds: job.delay
    })
    if job.is_retry?
      # We don't track here because we send stuff back to the incoming job queue and we track the
      # :retryActivated even there.
      # TODO - Once we're sure this is all working right we can delete the commented out line.
      #@tracker.track(:retryActivated, job)
    else
      @tracker.track(:scheduledJobActivated, job)
    end
  end
rescue ::Aws::DynamoDB::Errors::ConditionalCheckFailedException => e
  # This means that a different instance of the JobActivator (or someone doing stuff in the web UI)
  # got to the job first.
  Funktor.logger.debug "#{e.to_s} : #{e.message}"
  Funktor.logger.debug e.backtrace.join("\n")
end

#call(event:, context:) ⇒ Object



128
129
130
131
132
133
134
135
136
137
138
# File 'lib/funktor/job_activator.rb', line 128

def call(event:, context:)
  handled_item_count = 0
  jobs_to_activate.each do |item|
    if context.get_remaining_time_in_millis < 5_000 # This lets us exit gracefully and resume on the next round instead of getting forcibly killed.
      puts "Bailing out due to milliseconds remaining #{context.get_remaining_time_in_millis}"
      break
    end
    handle_item(item)
    handled_item_count += 1
  end
end

#delayed_job_tableObject



19
20
21
# File 'lib/funktor/job_activator.rb', line 19

def delayed_job_table
  ENV['FUNKTOR_JOBS_TABLE']
end

#dynamodb_clientObject



11
12
13
# File 'lib/funktor/job_activator.rb', line 11

def dynamodb_client
  Funktor.dynamodb_client
end

#handle_item(item) ⇒ Object



53
54
55
56
57
58
59
60
61
# File 'lib/funktor/job_activator.rb', line 53

def handle_item(item)
  job_shard = item["jobShard"]
  job_id = item["jobId"]
  current_category = item["category"]
  Funktor.logger.debug "jobShard = #{item['jobShard']}"
  Funktor.logger.debug "jobId = #{item['jobId']}"
  Funktor.logger.debug "current_category = #{current_category}"
  activate_job(job_shard, job_id, current_category)
end

#jobs_to_activateObject



23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/funktor/job_activator.rb', line 23

def jobs_to_activate
  # TODO : The lookahead time here should be configurable
  # If this doesn't match the setting in the IncomingJobHandler some jobs
  # might be activated and then immediately re-scheduled instead of being
  # queued, which leads to kind of confusing stats for the "incoming" stat.
  # (Come to think of it, the incoming stat is kind of confusting anyway since
  # it reflects retries and scheduled jobs activations...)
  target_time = (Time.now + 60).utc
  query_params = {
    expression_attribute_values: {
      ":queueable" => "true",
      ":targetTime" => target_time.iso8601
    },
    key_condition_expression: "queueable = :queueable AND performAt < :targetTime",
    projection_expression: "jobId, jobShard, category",
    table_name: delayed_job_table,
    index_name: "performAtIndex"
  }
  resp = dynamodb_client.query(query_params)
  return resp.items
end

#queue_for_job(job) ⇒ Object



45
46
47
48
49
50
51
# File 'lib/funktor/job_activator.rb', line 45

def queue_for_job(job)
  queue_name = job.queue || 'default'
  queue_constant = "FUNKTOR_#{queue_name.underscore.upcase}_QUEUE"
  Funktor.logger.debug "queue_constant = #{queue_constant}"
  Funktor.logger.debug "ENV value = #{ENV[queue_constant]}"
  ENV[queue_constant] || ENV['FUNKTOR_DEFAULT_QUEUE']
end

#sqs_clientObject



15
16
17
# File 'lib/funktor/job_activator.rb', line 15

def sqs_client
  Funktor.sqs_client
end