Module: Consumer

Included in:
Controls::Consumer::ErrorHandler::Example, Controls::Consumer::Incrementing
Defined in:
lib/consumer/log.rb,
lib/consumer/consumer.rb,
lib/consumer/defaults.rb,
lib/consumer/log_text.rb,
lib/consumer/substitute.rb,
lib/consumer/controls/id.rb,
lib/consumer/controls/get.rb,
lib/consumer/subscription.rb,
lib/consumer/controls/poll.rb,
lib/consumer/controls/error.rb,
lib/consumer/position_store.rb,
lib/consumer/controls/handle.rb,
lib/consumer/controls/session.rb,
lib/consumer/handler_registry.rb,
lib/consumer/controls/category.rb,
lib/consumer/controls/consumer.rb,
lib/consumer/controls/position.rb,
lib/consumer/controls/settings.rb,
lib/consumer/controls/identifier.rb,
lib/consumer/controls/message_data.rb,
lib/consumer/controls/subscription.rb,
lib/consumer/subscription/defaults.rb,
lib/consumer/controls/position_store.rb,
lib/consumer/controls/handle/settings.rb,
lib/consumer/position_store/telemetry.rb,
lib/consumer/controls/get/incrementing.rb,
lib/consumer/position_store/substitute.rb,
lib/consumer/controls/handle/raise_error.rb,
lib/consumer/controls/message_data/batch.rb,
lib/consumer/controls/position_store/file.rb,
lib/consumer/controls/consumer/incrementing.rb,
lib/consumer/controls/consumer/error_handler.rb

Defined Under Namespace

Modules: Build, Configure, Controls, Defaults, HandlerMacro, IdentifierMacro, LogText, PositionStore, Start, Substitute Classes: HandlerRegistry, Log, Subscription

Constant Summary collapse

Error =
Class.new(RuntimeError)

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.included(cls) ⇒ Object



4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/consumer/consumer.rb', line 4

def self.included(cls)
  cls.class_exec do
    include Dependency
    include Initializer
    include TemplateMethod
    include Log::Dependency

    extend Build
    extend Start

    extend HandlerMacro
    extend IdentifierMacro

    prepend Configure

    initializer :category

    attr_writer :identifier
    def identifier
      @identifier ||= self.class.identifier
    end

    ## Remove need for irregular memoization - Nathan, Mon Sep 9 2024
    def starting_position
      if not instance_variable_defined?(:@starting_position)
        @starting_position = position_store.get
      end
      @starting_position
    end
    attr_writer :starting_position

    attr_writer :position_update_interval
    def position_update_interval
      @position_update_interval ||= Defaults.position_update_interval
    end

    attr_writer :position_update_counter
    def position_update_counter
      @position_update_counter ||= 0
    end

    attr_accessor :poll_interval_milliseconds

    attr_accessor :session

    attr_accessor :supplemental_settings

    dependency :get, MessageStore::Get
    dependency :position_store, PositionStore
    dependency :subscription, Subscription

    template_method :error_raised do |error, message_data|
      raise error
    end

    alias_method :call, :dispatch
  end
end

Instance Method Details

#dispatch(message_data) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/consumer/consumer.rb', line 126

def dispatch(message_data)
  logger.trace(tags: [:consumer, :dispatch, :message]) { "Dispatching message (#{LogText.message_data(message_data)})" }

  self.class.handler_registry.each do |handler|
    handler.(message_data, session: session, settings: supplemental_settings)
  end

  update_position(message_data.global_position)

  logger.debug(tags: [:consumer, :dispatch, :message]) { "Message dispatched (#{LogText.message_data(message_data)})" }
rescue => error
  logger.error(tag: :*) { "Error raised (Error Class: #{error.class}, Error Message: #{error.message}, #{LogText.message_data(message_data)})" }
  error_raised(error, message_data)
end

#log_infoObject



109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/consumer/consumer.rb', line 109

def log_info
  logger.info(tags: [:consumer, :start]) { "Category: #{category} (Consumer: #{self.class.name})" }
  logger.info(tags: [:consumer, :start]) { "Position: #{starting_position} (Consumer: #{self.class.name})" }
  logger.info(tags: [:consumer, :start]) { "Identifier: #{identifier || 'nil'} (Consumer: #{self.class.name})" }

  log_startup_info if respond_to?(:log_startup_info)

  logger.info(tags: [:consumer, :start]) { "Position Update Interval: #{position_update_interval.inspect} (Consumer: #{self.class.name})" }

  logger.info(tags: [:consumer, :start]) { "Poll Interval Milliseconds: #{poll_interval_milliseconds.inspect} (Consumer: #{self.class.name})" }

  self.class.handler_registry.each do |handler|
    logger.info(tags: [:consumer, :start]) { "Handler: #{handler.name} (Consumer: #{self.class.name})" }
    logger.info(tags: [:consumer, :start]) { "Messages: #{handler.message_registry.message_types.join(', ')} (Handler: #{handler.name}, Consumer: #{self.class.name})" }
  end
end


89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/consumer/consumer.rb', line 89

def print_info
  STDOUT.puts
  STDOUT.puts "    Consumer: #{self.class.name}"
  STDOUT.puts "      Category: #{category}"
  STDOUT.puts "      Position: #{starting_position}"
  STDOUT.puts "      Identifier: #{identifier || '(none)'}"

  print_startup_info if respond_to?(:print_startup_info)

  STDOUT.puts "      Position Location: #{position_store.location || '(none)'}"

  STDOUT.puts

  STDOUT.puts "      Handlers:"
  self.class.handler_registry.each do |handler|
    STDOUT.puts "        Handler: #{handler.name}"
    STDOUT.puts "          Messages: #{handler.message_registry.message_types.join(', ')}"
  end
end

#start(&probe) ⇒ Object



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/consumer/consumer.rb', line 63

def start(&probe)
  logger.info(tags: [:consumer, :start]) { "Starting consumer: #{self.class.name} (Category: #{category}, Identifier: #{identifier || '(none)'}, Position: #{starting_position})" }

  if Defaults.startup_info?
    print_info
  end

  log_info
  starting if respond_to?(:starting)

  if not MessageStore::StreamName.category?(category)
    raise Error, "Consumer's stream name must be a category (Stream Name: #{category})"
  end

  _, subscription_thread = ::Actor::Start.(subscription)

  if probe
    subscription_address = subscription.address
    probe.(self, subscription_address, subscription_thread)
  end

  logger.info(tags: [:consumer, :start]) { "Started consumer: #{self.class.name} (Category: #{category}, Identifier: #{identifier || '(none)'}, Position: #{starting_position})" }

  AsyncInvocation::Incorrect
end

#update_position(position) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/consumer/consumer.rb', line 141

def update_position(position)
  logger.trace(tags: [:consumer, :position]) { "Updating position (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }

  self.position_update_counter += 1

  if position_update_counter >= position_update_interval
    position_store.put(position)

    logger.debug(tags: [:consumer, :position]) { "Updated position (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }

    self.position_update_counter = 0
  else
    logger.debug(tags: [:consumer, :position]) { "Interval not reached; position not updated (Global Position: #{position}, Counter: #{position_update_counter}/#{position_update_interval})" }
  end
end