Class: Kraps::Worker

Inherits:
Object
  • Object
show all
Includes:
MapReduce::Mergeable
Defined in:
lib/kraps/worker.rb

Instance Method Summary collapse

Constructor Details

#initialize(json, memory_limit:, chunk_limit:, concurrency:, logger: Logger.new("/dev/null")) ⇒ Worker

Returns a new instance of Worker.



5
6
7
8
9
10
11
# File 'lib/kraps/worker.rb', line 5

def initialize(json, memory_limit:, chunk_limit:, concurrency:, logger: Logger.new("/dev/null"))
  @args = JSON.parse(json)
  @memory_limit = memory_limit
  @chunk_limit = chunk_limit
  @concurrency = concurrency
  @logger = logger
end

Instance Method Details

#call(retries: 3) ⇒ Object

Raises:



13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/kraps/worker.rb', line 13

def call(retries: 3)
  return if redis_queue.stopped?

  raise(InvalidAction, "Invalid action #{step.action}") unless Actions::ALL.include?(step.action)

  dequeue do |payload|
    with_retries(retries) do # TODO: allow to use queue based retries
      step.before&.call

      send(:"perform_#{step.action}", payload)
    end
  end
end