Class: ThreeScale::Backend::WorkerAsync

Inherits:
Object
  • Object
show all
Includes:
Configurable, Worker
Defined in:
lib/3scale/backend/worker_async.rb

Instance Method Summary collapse

Methods included from Configurable

#configuration, #configuration=, included

Methods included from Worker

new, #one_off?, #to_s, work

Constructor Details

#initialize(options = {}) ⇒ WorkerAsync

Returns a new instance of WorkerAsync.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/3scale/backend/worker_async.rb', line 17

def initialize(options = {})
  trap('TERM') { shutdown }
  trap('INT')  { shutdown }

  @one_off = options[:one_off]
  @jobs = Queue.new # Thread-safe queue

  @job_fetcher = options[:job_fetcher] || JobFetcher.new(redis_client: redis_client)

  @max_concurrent_jobs = configuration.async_worker.max_concurrent_jobs ||
      DEFAULT_MAX_CONCURRENT_JOBS

  @reactor = Async::Reactor.new
end

Instance Method Details

#shutdownObject



57
58
59
60
# File 'lib/3scale/backend/worker_async.rb', line 57

def shutdown
  @job_fetcher.shutdown
  @shutdown = true
end

#workObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/3scale/backend/worker_async.rb', line 32

def work
  if one_off?
    Async { process_one }
    return
  end

  Async { register_worker }

  fetch_jobs_thread = start_thread_to_fetch_jobs

  loop do
    break if @shutdown
    schedule_jobs
    @reactor.run
  end

  fetch_jobs_thread.join

  # Ensure that we do not leave any jobs in memory
  @reactor.async { perform(@jobs.pop) } until @jobs.empty?
  @reactor.run

  Async { unregister_worker }
end