Class: Sbmt::Outbox::V2::Poller

Inherits:
BoxProcessor show all
Defined in:
lib/sbmt/outbox/v2/poller.rb

Instance Attribute Summary collapse

Attributes inherited from BoxProcessor

#started, #threads_count, #worker_name

Instance Method Summary collapse

Methods inherited from BoxProcessor

#alive?, #ready?, #safe_process_task, #start, #stop

Constructor Details

#initialize(boxes, partitions_count: nil, threads_count: nil, lock_timeout: nil, regular_items_batch_size: nil, retryable_items_batch_size: nil, throttler_tactic: nil, redis: nil) ⇒ Poller

Returns a new instance of Poller.



17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# File 'lib/sbmt/outbox/v2/poller.rb', line 17

def initialize(
  boxes,
  partitions_count: nil,
  threads_count: nil,
  lock_timeout: nil,
  regular_items_batch_size: nil,
  retryable_items_batch_size: nil,
  throttler_tactic: nil,
  redis: nil
)
  @partitions_count = partitions_count || poller_config.concurrency
  @lock_timeout = lock_timeout || poller_config.general_timeout

  @regular_items_batch_size = regular_items_batch_size || poller_config.regular_items_batch_size
  @retryable_items_batch_size = retryable_items_batch_size || poller_config.retryable_items_batch_size
  @max_buffer_size = @regular_items_batch_size + @retryable_items_batch_size
  @max_batch_size = @regular_items_batch_size

  super(boxes: boxes, threads_count: threads_count || poller_config.threads_count, name: "poller", redis: redis)

  @throttler = PollThrottler.build(throttler_tactic || poller_config.tactic || "default", self.redis, poller_config)
end

Instance Attribute Details

#lock_timeoutObject (readonly)

Returns the value of attribute lock_timeout.



15
16
17
# File 'lib/sbmt/outbox/v2/poller.rb', line 15

def lock_timeout
  @lock_timeout
end

#max_batch_sizeObject (readonly)

Returns the value of attribute max_batch_size.



15
16
17
# File 'lib/sbmt/outbox/v2/poller.rb', line 15

def max_batch_size
  @max_batch_size
end

#max_buffer_sizeObject (readonly)

Returns the value of attribute max_buffer_size.



15
16
17
# File 'lib/sbmt/outbox/v2/poller.rb', line 15

def max_buffer_size
  @max_buffer_size
end

#partitions_countObject (readonly)

Returns the value of attribute partitions_count.



15
16
17
# File 'lib/sbmt/outbox/v2/poller.rb', line 15

def partitions_count
  @partitions_count
end

#regular_items_batch_sizeObject (readonly)

Returns the value of attribute regular_items_batch_size.



15
16
17
# File 'lib/sbmt/outbox/v2/poller.rb', line 15

def regular_items_batch_size
  @regular_items_batch_size
end

#retryable_items_batch_sizeObject (readonly)

Returns the value of attribute retryable_items_batch_size.



15
16
17
# File 'lib/sbmt/outbox/v2/poller.rb', line 15

def retryable_items_batch_size
  @retryable_items_batch_size
end

#throttlerObject (readonly)

Returns the value of attribute throttler.



15
16
17
# File 'lib/sbmt/outbox/v2/poller.rb', line 15

def throttler
  @throttler
end

Instance Method Details

#process_task(_worker_number, task) ⇒ Object



44
45
46
# File 'lib/sbmt/outbox/v2/poller.rb', line 44

def process_task(_worker_number, task)
  poll(task)
end

#throttle(worker_number, poll_task, result) ⇒ Object



40
41
42
# File 'lib/sbmt/outbox/v2/poller.rb', line 40

def throttle(worker_number, poll_task, result)
  throttler.call(worker_number, poll_task, result)
end