Class: Cloudtasker::LocalServer

Inherits:
Object
  • Object
show all
Defined in:
lib/cloudtasker/local_server.rb

Overview

Process jobs stored in Redis. Only to be used in development.

Constant Summary collapse

CONCURRENCY =

Max number of task requests sent to the processing server

(ENV['CLOUDTASKER_CONCURRENCY'] || 5).to_i
QUEUE_CONCURRENCY =

Default number of threads to allocate to process a specific queue

1
JOB_POLLING_FREQUENCY =

Job Polling. How frequently to poll jobs in redis.

0.5

Instance Method Summary collapse

Instance Method Details

#process_jobs(queue = nil, concurrency = nil) ⇒ Object

Process enqueued workers.



62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/cloudtasker/local_server.rb', line 62

def process_jobs(queue = nil, concurrency = nil)
  @threads ||= {}
  @threads[queue] ||= []
  max_threads = (concurrency || QUEUE_CONCURRENCY).to_i

  # Remove any done thread
  @threads[queue].select!(&:alive?)

  # Process tasks
  while @threads[queue].count < max_threads && (task = Cloudtasker::Backend::RedisTask.pop(queue))
    @threads[queue] << Thread.new(task) { |t| process_task(t) }
  end
end

#process_task(task) ⇒ Object

Process a given task

Parameters:



81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/cloudtasker/local_server.rb', line 81

def process_task(task)
  Thread.current['task'] = task
  Thread.current['attempts'] = 0

  # Deliver task
  begin
    Thread.current['task']&.deliver
  rescue Errno::EBADF, Errno::ECONNREFUSED => e
    raise(e) unless Thread.current['attempts'] < 3

    # Retry on connection error, in case the web server is not
    # started yet.
    Thread.current['attempts'] += 1
    sleep(3)
    retry
  end
end

#start(opts = {}) ⇒ Object

Start the local server

Parameters:

  • opts (Hash) (defaults to: {})

    Server options.



40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# File 'lib/cloudtasker/local_server.rb', line 40

def start(opts = {})
  # Extract queues to process
  queues = opts[:queues].to_a.any? ? opts[:queues] : [[nil, CONCURRENCY]]

  # Display start banner
  queue_labels = queues.map { |n, c| "#{n || 'all'}=#{c || QUEUE_CONCURRENCY}" }.join(' ')
  Cloudtasker.logger.info("[Cloudtasker/Server] Processing queues: #{queue_labels}")

  # Start processing queues
  @start ||= Thread.new do
    until @done
      queues.each { |(n, c)| process_jobs(n, c) }
      sleep JOB_POLLING_FREQUENCY
    end
    Cloudtasker.logger.info('[Cloudtasker/Server] Local server exiting...')
  end
end

#stopObject

Stop the local server.



21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/cloudtasker/local_server.rb', line 21

def stop
  @done = true

  # Terminate threads and repush tasks
  @threads&.values&.flatten&.each do |t|
    t.terminate
    t['task']&.retry_later(0, is_error: false)
  end

  # Wait for main server to be done
  sleep 1 while @start&.alive?
end