Class: Funktor::JobActivator
- Inherits:
-
Object
- Object
- Funktor::JobActivator
- Defined in:
- lib/funktor/job_activator.rb
Instance Method Summary collapse
- #activate_job(job_shard, job_id, current_category, queue_immediately = false) ⇒ Object
- #call(event:, context:) ⇒ Object
- #delayed_job_table ⇒ Object
- #dynamodb_client ⇒ Object
- #handle_item(item) ⇒ Object
-
#initialize ⇒ JobActivator
constructor
A new instance of JobActivator.
- #jobs_to_activate ⇒ Object
- #queue_for_job(job) ⇒ Object
- #sqs_client ⇒ Object
Constructor Details
#initialize ⇒ JobActivator
Returns a new instance of JobActivator.
7 8 9 |
# File 'lib/funktor/job_activator.rb', line 7 def initialize @tracker = Funktor::ActivityTracker.new end |
Instance Method Details
#activate_job(job_shard, job_id, current_category, queue_immediately = false) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
# File 'lib/funktor/job_activator.rb', line 63 def activate_job(job_shard, job_id, current_category, queue_immediately = false) # TODO: WorkQueueVisibilityMiddleware to alter what happens here? Maybe we delete by default and then the middleware puts it back in the table? # First we conditionally update the item in Dynamo to be sure that another scheduler hasn't gotten # to it, and if that works then send to SQS. This is basically how Sidekiq scheduler works. response = if Funktor.enable_work_queue_visibility dynamodb_client.update_item({ key: { "jobShard" => job_shard, "jobId" => job_id }, update_expression: "SET category = :category, queueable = :queueable", condition_expression: "category = :current_category", expression_attribute_values: { ":current_category" => current_category, ":queueable" => "false", ":category" => "queued" }, table_name: delayed_job_table, return_values: "ALL_OLD" }) else dynamodb_client.delete_item({ key: { "jobShard" => job_shard, "jobId" => job_id }, condition_expression: "category = :current_category", expression_attribute_values: { ":current_category" => current_category }, table_name: delayed_job_table, return_values: "ALL_OLD" }) end if response.attributes # this means the record was still there in the state we expected Funktor.logger.debug "response.attributes ====== " Funktor.logger.debug response.attributes job = Funktor::Job.new(response.attributes["payload"]) Funktor.logger.debug "we created a job from payload" Funktor.logger.debug response.attributes["payload"] Funktor.logger.debug "queueing to #{job.retry_queue_url}" if queue_immediately job.delay = 0 end sqs_client.({ queue_url: job.retry_queue_url, message_body: job.to_json #delay_seconds: job.delay }) if job.is_retry? # We don't track here because we send stuff back to the incoming job queue and we track the # :retryActivated even there. # TODO - Once we're sure this is all working right we can delete the commented out line. #@tracker.track(:retryActivated, job) else @tracker.track(:scheduledJobActivated, job) end end rescue ::Aws::DynamoDB::Errors::ConditionalCheckFailedException => e # This means that a different instance of the JobActivator (or someone doing stuff in the web UI) # got to the job first. Funktor.logger.debug "#{e.to_s} : #{e.}" Funktor.logger.debug e.backtrace.join("\n") end |
#call(event:, context:) ⇒ Object
128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/funktor/job_activator.rb', line 128 def call(event:, context:) handled_item_count = 0 jobs_to_activate.each do |item| if context.get_remaining_time_in_millis < 5_000 # This lets us exit gracefully and resume on the next round instead of getting forcibly killed. puts "Bailing out due to milliseconds remaining #{context.get_remaining_time_in_millis}" break end handle_item(item) handled_item_count += 1 end end |
#delayed_job_table ⇒ Object
19 20 21 |
# File 'lib/funktor/job_activator.rb', line 19 def delayed_job_table ENV['FUNKTOR_JOBS_TABLE'] end |
#dynamodb_client ⇒ Object
11 12 13 |
# File 'lib/funktor/job_activator.rb', line 11 def dynamodb_client Funktor.dynamodb_client end |
#handle_item(item) ⇒ Object
53 54 55 56 57 58 59 60 61 |
# File 'lib/funktor/job_activator.rb', line 53 def handle_item(item) job_shard = item["jobShard"] job_id = item["jobId"] current_category = item["category"] Funktor.logger.debug "jobShard = #{item['jobShard']}" Funktor.logger.debug "jobId = #{item['jobId']}" Funktor.logger.debug "current_category = #{current_category}" activate_job(job_shard, job_id, current_category) end |
#jobs_to_activate ⇒ Object
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/funktor/job_activator.rb', line 23 def jobs_to_activate # TODO : The lookahead time here should be configurable # If this doesn't match the setting in the IncomingJobHandler some jobs # might be activated and then immediately re-scheduled instead of being # queued, which leads to kind of confusing stats for the "incoming" stat. # (Come to think of it, the incoming stat is kind of confusting anyway since # it reflects retries and scheduled jobs activations...) target_time = (Time.now + 60).utc query_params = { expression_attribute_values: { ":queueable" => "true", ":targetTime" => target_time.iso8601 }, key_condition_expression: "queueable = :queueable AND performAt < :targetTime", projection_expression: "jobId, jobShard, category", table_name: delayed_job_table, index_name: "performAtIndex" } resp = dynamodb_client.query(query_params) return resp.items end |
#queue_for_job(job) ⇒ Object
45 46 47 48 49 50 51 |
# File 'lib/funktor/job_activator.rb', line 45 def queue_for_job(job) queue_name = job.queue || 'default' queue_constant = "FUNKTOR_#{queue_name.underscore.upcase}_QUEUE" Funktor.logger.debug "queue_constant = #{queue_constant}" Funktor.logger.debug "ENV value = #{ENV[queue_constant]}" ENV[queue_constant] || ENV['FUNKTOR_DEFAULT_QUEUE'] end |
#sqs_client ⇒ Object
15 16 17 |
# File 'lib/funktor/job_activator.rb', line 15 def sqs_client Funktor.sqs_client end |