Class: ThreeScale::Backend::JobFetcher

Inherits:
Object
  • Object
show all
Includes:
Resque::Helpers, Configurable
Defined in:
lib/3scale/backend/job_fetcher.rb

Constant Summary collapse

RedisConnectionError =
Class.new(RuntimeError)

Instance Method Summary collapse

Methods included from Configurable

#configuration, #configuration=, included

Constructor Details

#initialize(redis_client: redis, fetch_timeout: REDIS_TIMEOUT) ⇒ JobFetcher

The default redis_client is the one defined in Resque::Helpers



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/3scale/backend/job_fetcher.rb', line 23

def initialize(redis_client: redis, fetch_timeout: REDIS_TIMEOUT)
  @redis = redis_client
  @fetch_timeout = fetch_timeout
  @queues ||= QUEUES.map { |q| "queue:#{q}" }

  @max_pending_jobs = configuration.async_worker.max_pending_jobs ||
      DEFAULT_MAX_PENDING_JOBS

  @wait_before_fetching_more = configuration.async_worker.seconds_before_fetching_more ||
      DEFAULT_WAIT_BEFORE_FETCHING_MORE_JOBS
end

Instance Method Details

#fetchObject



35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/3scale/backend/job_fetcher.rb', line 35

def fetch
  encoded_job = pop_from_queue
  return nil if encoded_job.nil? || encoded_job.empty?

  begin
    # Resque::Job.new accepts a queue name as a param. It is very
    # important to set here the same name as the one we set when calling
    # Resque.enqueue. Resque.enqueue uses the @queue ivar in
    # BackgroundJob classes as the name of the queue, and then, it stores
    # the job in a queue called resque:queue:_@queue_. 'resque:' is the
    # namespace and 'queue:' is added automatically. That's why we need
    # to call blpop on 'queue:#{q}' above. However, when creating the job
    # we cannot set 'queue:#{q}' as the name. Otherwise, if it fails and
    # it is re-queued, it will end up in resque:queue:queue:_@queue_
    # instead of resque:queue:_@queue_.
    encoded_job[0].sub!('queue:', '')
    Resque::Job.new(encoded_job[0],
                    Yajl::Parser.parse(encoded_job[1], check_utf8: false))
  rescue Exception => e
    # I think that the only exception that can be raised here is
    # Yajl::ParseError. However, this is a critical part of the code so
    # we will capture all of them just to be safe.
    Worker.logger.notify(e)
    nil
  end
end

#shutdownObject



102
103
104
# File 'lib/3scale/backend/job_fetcher.rb', line 102

def shutdown
  @shutdown = true
end

#start(job_queue) ⇒ Object

Note: this method calls #close on job_queue after receiving #shutdown. That signals to the caller that there won’t be any more jobs.



64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/3scale/backend/job_fetcher.rb', line 64

def start(job_queue)
  loop do
    break if @shutdown

    if job_queue.size >= @max_pending_jobs
      sleep @wait_before_fetching_more
    else
      begin
        job = fetch
      rescue RedisConnectionError => e
        # If there has been a connection error or a timeout we wait a bit
        # because normally, it will be a temporary problem.
        # In the future, we might want to put a limit in the total number
        # of attempts or implement exponential backoff retry times.
        Worker.logger.notify(e)
        sleep(1)

        # Re-instantiate Redis instance. This is needed to recover from
        # Errno::EPIPE, not sure if there are others.
        @redis = Redis::Namespace.new(
          WorkerAsync.const_get(:RESQUE_REDIS_NAMESPACE),
          redis: QueueStorage.connection(Backend.environment, Backend.configuration)
        )

       # If there is a different kind of error, it's probably a
       # programming error. Like sending an invalid blpop command to
       # Redis. In that case, let the worker crash.
      end
      job_queue << job if job
    end
  end

rescue Exception => e
  Worker.logger.notify(e)
ensure
  job_queue.close
end