Module: Cloudenvoy::Subscriber
- Defined in:
- lib/cloudenvoy/subscriber.rb
Overview
Use this module to define subscribers. Subscribers must implement the message processsing logic in the ‘process` method.
E.g.
class UserSubscriber
include Cloudenvoy::Subscriber
# Specify subscription options
cloudenvoy_options topics: ['my-topic']
# Process message objects
def process(message)
...do something...
end
end
Defined Under Namespace
Modules: ClassMethods
Class Method Summary collapse
-
.execute_from_descriptor(input_payload) ⇒ Any
Execute a subscriber from a payload object received from Pub/Sub.
-
.from_sub_uri(sub_uri) ⇒ Class
Return the subscriber class for the provided class name.
-
.included(base) ⇒ Object
Add class method to including class.
-
.parse_sub_uri(sub_uri) ⇒ Array<String,String>
Parse the subscription name and return the subscriber name and topic.
Instance Method Summary collapse
-
#==(other) ⇒ Boolean
Equality operator.
-
#execute ⇒ Any
Execute the subscriber’s logic.
-
#initialize(message:) ⇒ Object
Build a new subscriber instance.
-
#logger ⇒ Logger, any
Return the Cloudenvoy logger instance.
-
#process_duration ⇒ Float
Return the time taken (in seconds) to process the message.
Class Method Details
.execute_from_descriptor(input_payload) ⇒ Any
Execute a subscriber from a payload object received from Pub/Sub.
the message to process.
68 69 70 71 72 |
# File 'lib/cloudenvoy/subscriber.rb', line 68 def self.execute_from_descriptor(input_payload) = Message.from_descriptor(input_payload) subscriber = .subscriber || raise(InvalidSubscriberError) subscriber.execute end |
.from_sub_uri(sub_uri) ⇒ Class
Return the subscriber class for the provided class name.
39 40 41 42 43 44 45 46 |
# File 'lib/cloudenvoy/subscriber.rb', line 39 def self.from_sub_uri(sub_uri) klass_name = Subscriber.parse_sub_uri(sub_uri)[0] # Check that subscriber class is a valid subscriber sub_klass = Object.const_get(klass_name.camelize) sub_klass.include?(self) ? sub_klass : nil end |
.included(base) ⇒ Object
Add class method to including class
23 24 25 26 27 28 29 |
# File 'lib/cloudenvoy/subscriber.rb', line 23 def self.included(base) base.extend(ClassMethods) base.attr_accessor :message, :process_started_at, :process_ended_at # Register subscriber Cloudenvoy.subscribers.add(base) end |
.parse_sub_uri(sub_uri) ⇒ Array<String,String>
Parse the subscription name and return the subscriber name and topic.
55 56 57 |
# File 'lib/cloudenvoy/subscriber.rb', line 55 def self.parse_sub_uri(sub_uri) sub_uri.split('/').last.split('.', 2).last.split('.', 2) end |
Instance Method Details
#==(other) ⇒ Boolean
Equality operator.
193 194 195 |
# File 'lib/cloudenvoy/subscriber.rb', line 193 def ==(other) other.is_a?(self.class) && other. == end |
#execute ⇒ Any
Execute the subscriber’s logic.
172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/cloudenvoy/subscriber.rb', line 172 def execute logger.info('Processing message...') # Process message resp = execute_middleware_chain # Log processing completion and return result logger.info("Processing done after #{process_duration}s") { { duration: process_duration } } resp rescue StandardError => e logger.info("Processing failed after #{process_duration}s") { { duration: process_duration } } raise(e) end |
#initialize(message:) ⇒ Object
Build a new subscriber instance.
142 143 144 |
# File 'lib/cloudenvoy/subscriber.rb', line 142 def initialize(message:) @message = end |
#logger ⇒ Logger, any
Return the Cloudenvoy logger instance.
151 152 153 |
# File 'lib/cloudenvoy/subscriber.rb', line 151 def logger @logger ||= SubscriberLogger.new(self) end |
#process_duration ⇒ Float
Return the time taken (in seconds) to process the message. This duration includes the middlewares and the actual process method.
161 162 163 164 165 |
# File 'lib/cloudenvoy/subscriber.rb', line 161 def process_duration return 0.0 unless process_ended_at && process_started_at (process_ended_at - process_started_at).ceil(3) end |