Class: Consumer::Subscription
- Inherits:
-
Object
- Object
- Consumer::Subscription
- Includes:
- Actor, Dependency, Initializer, Log::Dependency
- Defined in:
- lib/consumer/subscription.rb,
lib/consumer/subscription/defaults.rb
Defined Under Namespace
Modules: Defaults
Class Method Summary collapse
- .build(consumer, get, position: nil, poll_interval_milliseconds: nil) ⇒ Object
- .configure(receiver, get, consumer: nil, position: nil, poll_interval_milliseconds: nil, attr_name: nil) ⇒ Object
Instance Method Summary collapse
Class Method Details
.build(consumer, get, position: nil, poll_interval_milliseconds: nil) ⇒ Object
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/consumer/subscription.rb', line 25 def self.build(consumer, get, position: nil, poll_interval_milliseconds: nil) poll_interval_milliseconds ||= Defaults.poll_interval_milliseconds poll_timeout_milliseconds = Defaults.poll_timeout_milliseconds instance = new(get, position) instance.consumer = consumer Poll.configure( instance, interval_milliseconds: poll_interval_milliseconds, timeout_milliseconds: poll_timeout_milliseconds ) instance.configure instance end |
.configure(receiver, get, consumer: nil, position: nil, poll_interval_milliseconds: nil, attr_name: nil) ⇒ Object
43 44 45 46 47 48 49 50 51 |
# File 'lib/consumer/subscription.rb', line 43 def self.configure(receiver, get, consumer: nil, position: nil, poll_interval_milliseconds: nil, attr_name: nil) attr_name ||= :subscription consumer ||= receiver instance = build(consumer, get, position: position, poll_interval_milliseconds: poll_interval_milliseconds) receiver.public_send(:"#{attr_name}=", instance) instance end |
Instance Method Details
#batch_size ⇒ Object
13 14 15 |
# File 'lib/consumer/subscription.rb', line 13 def batch_size get.batch_size end |
#category ⇒ Object
17 18 19 |
# File 'lib/consumer/subscription.rb', line 17 def category get.category end |
#dispatch(message) ⇒ Object
77 78 79 80 81 82 83 84 85 |
# File 'lib/consumer/subscription.rb', line 77 def dispatch() logger.trace(tag: :actor) { "Dispatching message (Position: #{position}, Category: #{category})" } consumer.dispatch() self.position = .global_position + 1 logger.debug(tag: :actor) { "Dispatched message (Position: #{position}, Category: #{category})" } end |
#position ⇒ Object
21 22 23 |
# File 'lib/consumer/subscription.rb', line 21 def position @position ||= 0 end |