Module: Cloudenvoy::Subscriber

Defined in:
lib/cloudenvoy/subscriber.rb

Overview

Use this module to define subscribers. Subscribers must implement the message processsing logic in the ‘process` method.

E.g.

class UserSubscriber

include Cloudenvoy::Subscriber

# Specify subscription options
cloudenvoy_options topics: ['my-topic']

# Process message objects
def process(message)
  ...do something...
end

end

Defined Under Namespace

Modules: ClassMethods

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.execute_from_descriptor(input_payload) ⇒ Any

Execute a subscriber from a payload object received from Pub/Sub.

the message to process.

Parameters:

  • input_payload (Hash)

    The Pub/Sub webhook hash describing

Returns:

  • (Any)

    The subscriber processing return value.



68
69
70
71
72
# File 'lib/cloudenvoy/subscriber.rb', line 68

def self.execute_from_descriptor(input_payload)
  message = Message.from_descriptor(input_payload)
  subscriber = message.subscriber || raise(InvalidSubscriberError)
  subscriber.execute
end

.from_sub_uri(sub_uri) ⇒ Class

Return the subscriber class for the provided class name.

Parameters:

  • sub_uri (String)

    The subscription uri.

Returns:

  • (Class)

    The subscriber class



39
40
41
42
43
44
45
46
# File 'lib/cloudenvoy/subscriber.rb', line 39

def self.from_sub_uri(sub_uri)
  klass_name = Subscriber.parse_sub_uri(sub_uri)[0]

  # Check that subscriber class is a valid subscriber
  sub_klass = Object.const_get(klass_name.camelize)

  sub_klass.include?(self) ? sub_klass : nil
end

.included(base) ⇒ Object

Add class method to including class



23
24
25
26
27
28
29
# File 'lib/cloudenvoy/subscriber.rb', line 23

def self.included(base)
  base.extend(ClassMethods)
  base.attr_accessor :message, :process_started_at, :process_ended_at

  # Register subscriber
  Cloudenvoy.subscribers.add(base)
end

.parse_sub_uri(sub_uri) ⇒ Array<String,String>

Parse the subscription name and return the subscriber name and topic.

Parameters:

  • sub_uri (String)

    The subscription URI

Returns:

  • (Array<String,String>)

    A tuple [subscriber_name, topic ]



55
56
57
# File 'lib/cloudenvoy/subscriber.rb', line 55

def self.parse_sub_uri(sub_uri)
  sub_uri.split('/').last.split('.', 2).last.split('.', 2)
end

Instance Method Details

#==(other) ⇒ Boolean

Equality operator.

Parameters:

  • other (Any)

    The object to compare.

Returns:

  • (Boolean)

    True if the object is equal.



193
194
195
# File 'lib/cloudenvoy/subscriber.rb', line 193

def ==(other)
  other.is_a?(self.class) && other.message == message
end

#executeAny

Execute the subscriber’s logic.

Returns:

  • (Any)

    The logic return value



172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/cloudenvoy/subscriber.rb', line 172

def execute
  logger.info('Processing message...')

  # Process message
  resp = execute_middleware_chain

  # Log processing completion and return result
  logger.info("Processing done after #{process_duration}s") { { duration: process_duration } }
  resp
rescue StandardError => e
  logger.info("Processing failed after #{process_duration}s") { { duration: process_duration } }
  raise(e)
end

#initialize(message:) ⇒ Object

Build a new subscriber instance.

Parameters:



142
143
144
# File 'lib/cloudenvoy/subscriber.rb', line 142

def initialize(message:)
  @message = message
end

#loggerLogger, any

Return the Cloudenvoy logger instance.

Returns:

  • (Logger, any)

    The cloudenvoy logger.



151
152
153
# File 'lib/cloudenvoy/subscriber.rb', line 151

def logger
  @logger ||= SubscriberLogger.new(self)
end

#process_durationFloat

Return the time taken (in seconds) to process the message. This duration includes the middlewares and the actual process method.

Returns:

  • (Float)

    The time taken in seconds as a floating point number.



161
162
163
164
165
# File 'lib/cloudenvoy/subscriber.rb', line 161

def process_duration
  return 0.0 unless process_ended_at && process_started_at

  (process_ended_at - process_started_at).ceil(3)
end