Class: Consumer::Subscription

Inherits:
Object
  • Object
show all
Includes:
Actor, Dependency, Initializer, Log::Dependency
Defined in:
lib/consumer/subscription.rb,
lib/consumer/subscription/defaults.rb

Defined Under Namespace

Modules: Defaults

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.build(consumer, get, position: nil, poll_interval_milliseconds: nil) ⇒ Object



25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/consumer/subscription.rb', line 25

def self.build(consumer, get, position: nil, poll_interval_milliseconds: nil)
  poll_interval_milliseconds ||= Defaults.poll_interval_milliseconds
  poll_timeout_milliseconds = Defaults.poll_timeout_milliseconds

  instance = new(get, position)

  instance.consumer = consumer

  Poll.configure(
    instance,
    interval_milliseconds: poll_interval_milliseconds,
    timeout_milliseconds: poll_timeout_milliseconds
  )

  instance.configure
  instance
end

.configure(receiver, get, consumer: nil, position: nil, poll_interval_milliseconds: nil, attr_name: nil) ⇒ Object



43
44
45
46
47
48
49
50
51
# File 'lib/consumer/subscription.rb', line 43

def self.configure(receiver, get, consumer: nil, position: nil, poll_interval_milliseconds: nil, attr_name: nil)
  attr_name ||= :subscription

  consumer ||= receiver

  instance = build(consumer, get, position: position, poll_interval_milliseconds: poll_interval_milliseconds)
  receiver.public_send(:"#{attr_name}=", instance)
  instance
end

Instance Method Details

#batch_sizeObject



13
14
15
# File 'lib/consumer/subscription.rb', line 13

def batch_size
  get.batch_size
end

#categoryObject



17
18
19
# File 'lib/consumer/subscription.rb', line 17

def category
  get.category
end

#dispatch(message) ⇒ Object



77
78
79
80
81
82
83
84
85
# File 'lib/consumer/subscription.rb', line 77

def dispatch(message)
  logger.trace(tag: :actor) { "Dispatching message (Position: #{position}, Category: #{category})" }

  consumer.dispatch(message)

  self.position = message.global_position + 1

  logger.debug(tag: :actor) { "Dispatched message (Position: #{position}, Category: #{category})" }
end

#positionObject



21
22
23
# File 'lib/consumer/subscription.rb', line 21

def position
  @position ||= 0
end