Class: Deimos::Utils::InlineConsumer
- Inherits:
-
Object
- Object
- Deimos::Utils::InlineConsumer
- Defined in:
- lib/deimos/utils/inline_consumer.rb
Overview
Class which can process/consume messages inline.
Constant Summary collapse
- MAX_MESSAGE_WAIT_TIME =
1.second
- MAX_TOPIC_WAIT_TIME =
10.seconds
Class Method Summary collapse
-
.consume(topic:, frk_consumer:, num_messages: 10) ⇒ void
Consume the last X messages from a topic.
-
.get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil, config_class: nil, num_messages: 10) ⇒ Array<Hash>
Get the last X messages from a topic.
Class Method Details
.consume(topic:, frk_consumer:, num_messages: 10) ⇒ void
This method returns an undefined value.
Consume the last X messages from a topic.
117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/deimos/utils/inline_consumer.rb', line 117 def self.consume(topic:, frk_consumer:, num_messages: 10) listener = SeekListener.new( handler: frk_consumer, group_id: SecureRandom.hex, topic: topic, heartbeat_interval: 1 ) listener. = # Add the start_time and last_message_time attributes to the # consumer class so we can kill it if it's gone on too long class << frk_consumer attr_accessor :start_time, :last_message_time end subscribers = [] subscribers << ActiveSupport::Notifications. subscribe('phobos.listener.process_message') do frk_consumer. = Time.zone.now end subscribers << ActiveSupport::Notifications. subscribe('phobos.listener.start_handler') do frk_consumer.start_time = Time.zone.now frk_consumer. = nil end subscribers << ActiveSupport::Notifications. subscribe('heartbeat.consumer.kafka') do if frk_consumer. if Time.zone.now - frk_consumer. > MAX_MESSAGE_WAIT_TIME raise Phobos::AbortError end elsif Time.zone.now - frk_consumer.start_time > MAX_TOPIC_WAIT_TIME Deimos.config.logger.error('Aborting - initial wait too long') raise Phobos::AbortError end end listener.start subscribers.each { |s| ActiveSupport::Notifications.unsubscribe(s) } end |
.get_messages_for(topic:, schema: nil, namespace: nil, key_config: nil, config_class: nil, num_messages: 10) ⇒ Array<Hash>
Get the last X messages from a topic. You can specify a subclass of Deimos::Consumer or Deimos::Producer, or provide the schema, namespace and key_config directly.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/deimos/utils/inline_consumer.rb', line 89 def self.(topic:, schema: nil, namespace: nil, key_config: nil, config_class: nil, num_messages: 10) if config_class MessageBankHandler.config_class = config_class elsif schema.nil? || key_config.nil? raise 'You must specify either a config_class or a schema, namespace and key_config!' else MessageBankHandler.class_eval do schema schema namespace namespace key_config key_config @decoder = nil @key_decoder = nil end end self.consume(topic: topic, frk_consumer: MessageBankHandler, num_messages: ) = MessageBankHandler. .size <= ? : [-..-1] end |