Class: Sbmt::Outbox::V2::Processor

Inherits:
BoxProcessor show all
Defined in:
lib/sbmt/outbox/v2/processor.rb

Constant Summary collapse

REDIS_BRPOP_MIN_DELAY =
0.1

Instance Attribute Summary collapse

Attributes inherited from BoxProcessor

#started, #threads_count, #worker_name

Instance Method Summary collapse

Methods inherited from BoxProcessor

#alive?, #ready?, #safe_process_task, #start, #stop, #throttle

Constructor Details

#initialize(boxes, threads_count: nil, lock_timeout: nil, brpop_delay: nil, redis: nil) ⇒ Processor

Returns a new instance of Processor.



17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/sbmt/outbox/v2/processor.rb', line 17

def initialize(
  boxes,
  threads_count: nil,
  lock_timeout: nil,
  brpop_delay: nil,
  redis: nil
)
  @lock_timeout = lock_timeout || processor_config.general_timeout
  @brpop_delay = brpop_delay || redis_brpop_delay(boxes.count, processor_config.brpop_delay)

  super(boxes: boxes, threads_count: threads_count || processor_config.threads_count, name: "processor", redis: redis)
end

Instance Attribute Details

#brpop_delayObject (readonly)

Returns the value of attribute brpop_delay.



13
14
15
# File 'lib/sbmt/outbox/v2/processor.rb', line 13

def brpop_delay
  @brpop_delay
end

#lock_timeoutObject (readonly)

Returns the value of attribute lock_timeout.



13
14
15
# File 'lib/sbmt/outbox/v2/processor.rb', line 13

def lock_timeout
  @lock_timeout
end

Instance Method Details

#process_task(_worker_number, task) ⇒ Object



30
31
32
33
# File 'lib/sbmt/outbox/v2/processor.rb', line 30

def process_task(_worker_number, task)
  middlewares = Middleware::Builder.new(batch_process_middlewares)
  middlewares.call(task) { process(task) }
end