Class: ThreeScale::Backend::JobFetcher
- Inherits:
-
Object
- Object
- ThreeScale::Backend::JobFetcher
- Includes:
- Resque::Helpers, Configurable
- Defined in:
- lib/3scale/backend/job_fetcher.rb
Constant Summary collapse
- RedisConnectionError =
Class.new(RuntimeError)
Instance Method Summary collapse
- #fetch ⇒ Object
-
#initialize(redis_client: redis, fetch_timeout: REDIS_TIMEOUT) ⇒ JobFetcher
constructor
The default redis_client is the one defined in Resque::Helpers.
- #shutdown ⇒ Object
-
#start(job_queue) ⇒ Object
Note: this method calls #close on job_queue after receiving #shutdown.
Methods included from Configurable
#configuration, #configuration=, included
Constructor Details
#initialize(redis_client: redis, fetch_timeout: REDIS_TIMEOUT) ⇒ JobFetcher
The default redis_client is the one defined in Resque::Helpers
23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/3scale/backend/job_fetcher.rb', line 23 def initialize(redis_client: redis, fetch_timeout: REDIS_TIMEOUT) @redis = redis_client @fetch_timeout = fetch_timeout @queues ||= QUEUES.map { |q| "queue:#{q}" } @max_pending_jobs = configuration.async_worker.max_pending_jobs || DEFAULT_MAX_PENDING_JOBS @wait_before_fetching_more = configuration.async_worker.seconds_before_fetching_more || DEFAULT_WAIT_BEFORE_FETCHING_MORE_JOBS end |
Instance Method Details
#fetch ⇒ Object
35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/3scale/backend/job_fetcher.rb', line 35 def fetch encoded_job = pop_from_queue return nil if encoded_job.nil? || encoded_job.empty? begin # Resque::Job.new accepts a queue name as a param. It is very # important to set here the same name as the one we set when calling # Resque.enqueue. Resque.enqueue uses the @queue ivar in # BackgroundJob classes as the name of the queue, and then, it stores # the job in a queue called resque:queue:_@queue_. 'resque:' is the # namespace and 'queue:' is added automatically. That's why we need # to call blpop on 'queue:#{q}' above. However, when creating the job # we cannot set 'queue:#{q}' as the name. Otherwise, if it fails and # it is re-queued, it will end up in resque:queue:queue:_@queue_ # instead of resque:queue:_@queue_. encoded_job[0].sub!('queue:', '') Resque::Job.new(encoded_job[0], Yajl::Parser.parse(encoded_job[1], check_utf8: false)) rescue Exception => e # I think that the only exception that can be raised here is # Yajl::ParseError. However, this is a critical part of the code so # we will capture all of them just to be safe. Worker.logger.notify(e) nil end end |
#shutdown ⇒ Object
102 103 104 |
# File 'lib/3scale/backend/job_fetcher.rb', line 102 def shutdown @shutdown = true end |
#start(job_queue) ⇒ Object
Note: this method calls #close on job_queue after receiving #shutdown. That signals to the caller that there won’t be any more jobs.
64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/3scale/backend/job_fetcher.rb', line 64 def start(job_queue) loop do break if @shutdown if job_queue.size >= @max_pending_jobs sleep @wait_before_fetching_more else begin job = fetch rescue RedisConnectionError => e # If there has been a connection error or a timeout we wait a bit # because normally, it will be a temporary problem. # In the future, we might want to put a limit in the total number # of attempts or implement exponential backoff retry times. Worker.logger.notify(e) sleep(1) # Re-instantiate Redis instance. This is needed to recover from # Errno::EPIPE, not sure if there are others. @redis = Redis::Namespace.new( WorkerAsync.const_get(:RESQUE_REDIS_NAMESPACE), redis: QueueStorage.connection(Backend.environment, Backend.configuration) ) # If there is a different kind of error, it's probably a # programming error. Like sending an invalid blpop command to # Redis. In that case, let the worker crash. end job_queue << job if job end end rescue Exception => e Worker.logger.notify(e) ensure job_queue.close end |