Module: Consumer
- Included in:
- Controls::Consumer::ErrorHandler::Example, Controls::Consumer::Incrementing
- Defined in:
- lib/consumer/log.rb,
lib/consumer/consumer.rb,
lib/consumer/defaults.rb,
lib/consumer/log_text.rb,
lib/consumer/substitute.rb,
lib/consumer/controls/id.rb,
lib/consumer/controls/get.rb,
lib/consumer/subscription.rb,
lib/consumer/controls/poll.rb,
lib/consumer/controls/error.rb,
lib/consumer/position_store.rb,
lib/consumer/controls/handle.rb,
lib/consumer/controls/session.rb,
lib/consumer/handler_registry.rb,
lib/consumer/controls/category.rb,
lib/consumer/controls/consumer.rb,
lib/consumer/controls/position.rb,
lib/consumer/controls/settings.rb,
lib/consumer/controls/identifier.rb,
lib/consumer/controls/message_data.rb,
lib/consumer/controls/subscription.rb,
lib/consumer/subscription/defaults.rb,
lib/consumer/controls/position_store.rb,
lib/consumer/controls/handle/settings.rb,
lib/consumer/position_store/telemetry.rb,
lib/consumer/controls/get/incrementing.rb,
lib/consumer/position_store/substitute.rb,
lib/consumer/controls/handle/raise_error.rb,
lib/consumer/controls/message_data/batch.rb,
lib/consumer/controls/position_store/file.rb,
lib/consumer/controls/consumer/incrementing.rb,
lib/consumer/controls/consumer/error_handler.rb
Defined Under Namespace
Modules: Build, Configure, Controls, Defaults, HandlerMacro, IdentifierMacro, LogText, PositionStore, Start, Substitute
Classes: HandlerRegistry, Log, Subscription
Constant Summary
collapse
- Error =
Class.new(RuntimeError)
Class Method Summary
collapse
Instance Method Summary
collapse
Class Method Details
.included(cls) ⇒ Object
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/consumer/consumer.rb', line 4
def self.included(cls)
cls.class_exec do
include Dependency
include Initializer
include TemplateMethod
include Log::Dependency
extend Build
extend Start
extend HandlerMacro
extend IdentifierMacro
prepend Configure
initializer :category
attr_writer :identifier
def identifier
@identifier ||= self.class.identifier
end
def starting_position
if not instance_variable_defined?(:@starting_position)
@starting_position = position_store.get
end
@starting_position
end
attr_writer :starting_position
attr_writer :position_update_interval
def position_update_interval
@position_update_interval ||= Defaults.position_update_interval
end
attr_writer :position_update_counter
def position_update_counter
@position_update_counter ||= 0
end
attr_accessor :poll_interval_milliseconds
attr_accessor :session
attr_accessor :supplemental_settings
dependency :get, MessageStore::Get
dependency :position_store, PositionStore
dependency :subscription, Subscription
template_method :error_raised do |error, message_data|
raise error
end
alias_method :call, :dispatch
end
end
|
Instance Method Details
#dispatch(message_data) ⇒ Object
126
127
128
129
130
131
132
133
134
135
136
137
138
139
|
# File 'lib/consumer/consumer.rb', line 126
def dispatch(message_data)
logger.trace(tags: [:consumer, :dispatch, :message]) { "Dispatching message (#{LogText.message_data(message_data)})" }
self.class.handler_registry.each do |handler|
handler.(message_data, session: session, settings: supplemental_settings)
end
update_position(message_data.global_position)
logger.debug(tags: [:consumer, :dispatch, :message]) { "Message dispatched (#{LogText.message_data(message_data)})" }
rescue => error
logger.error(tag: :*) { "Error raised (Error Class: #{error.class}, Error Message: #{error.message}, #{LogText.message_data(message_data)})" }
error_raised(error, message_data)
end
|
#log_info ⇒ Object
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/consumer/consumer.rb', line 109
def log_info
logger.info(tags: [:consumer, :start]) { "Category: #{category} (Consumer: #{self.class.name})" }
logger.info(tags: [:consumer, :start]) { "Position: #{starting_position} (Consumer: #{self.class.name})" }
logger.info(tags: [:consumer, :start]) { "Identifier: #{identifier || 'nil'} (Consumer: #{self.class.name})" }
log_startup_info if respond_to?(:log_startup_info)
logger.info(tags: [:consumer, :start]) { "Position Update Interval: #{position_update_interval.inspect} (Consumer: #{self.class.name})" }
logger.info(tags: [:consumer, :start]) { "Poll Interval Milliseconds: #{poll_interval_milliseconds.inspect} (Consumer: #{self.class.name})" }
self.class.handler_registry.each do |handler|
logger.info(tags: [:consumer, :start]) { "Handler: #{handler.name} (Consumer: #{self.class.name})" }
logger.info(tags: [:consumer, :start]) { "Messages: #{handler.message_registry.message_types.join(', ')} (Handler: #{handler.name}, Consumer: #{self.class.name})" }
end
end
|
#print_info ⇒ Object
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
|
# File 'lib/consumer/consumer.rb', line 89
def print_info
STDOUT.puts
STDOUT.puts " Consumer: #{self.class.name}"
STDOUT.puts " Category: #{category}"
STDOUT.puts " Position: #{starting_position}"
STDOUT.puts " Identifier: #{identifier || '(none)'}"
print_startup_info if respond_to?(:print_startup_info)
STDOUT.puts " Position Location: #{position_store.location || '(none)'}"
STDOUT.puts
STDOUT.puts " Handlers:"
self.class.handler_registry.each do |handler|
STDOUT.puts " Handler: #{handler.name}"
STDOUT.puts " Messages: #{handler.message_registry.message_types.join(', ')}"
end
end
|
#start(&probe) ⇒ Object
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
# File 'lib/consumer/consumer.rb', line 63
def start(&probe)
logger.info(tags: [:consumer, :start]) { "Starting consumer: #{self.class.name} (Category: #{category}, Identifier: #{identifier || '(none)'}, Position: #{starting_position})" }
if Defaults.startup_info?
print_info
end
log_info
starting if respond_to?(:starting)
if not MessageStore::StreamName.category?(category)
raise Error, "Consumer's stream name must be a category (Stream Name: #{category})"
end
_, subscription_thread = ::Actor::Start.(subscription)
if probe
subscription_address = subscription.address
probe.(self, subscription_address, subscription_thread)
end
logger.info(tags: [:consumer, :start]) { "Started consumer: #{self.class.name} (Category: #{category}, Identifier: #{identifier || '(none)'}, Position: #{starting_position})" }
AsyncInvocation::Incorrect
end
|
#update_position(position) ⇒ Object
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
# File 'lib/consumer/consumer.rb', line 141
def update_position(position)
logger.trace(tags: [:consumer, :position]) { "Updating position (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }
self.position_update_counter += 1
if position_update_counter >= position_update_interval
position_store.put(position)
logger.debug(tags: [:consumer, :position]) { "Updated position (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }
self.position_update_counter = 0
else
logger.debug(tags: [:consumer, :position]) { "Interval not reached; position not updated (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }
end
end
|