Class: Subserver::Listener

Inherits:
Object
  • Object
show all
Includes:
Util
Defined in:
lib/subserver/listener.rb

Overview

The Listener is a standalone thread which:

  1. Starts Google Pubsub subscription threads which:

a. Instantiate the Subscription class
b. Run the middleware chain
c. call subscriber #perform

A Listener can exit due to shutdown (listner_stopped) or due to an error during message processing (listener_died)

If an error occurs during message processing, the Listener calls the Manager to create a new one to replace itself and exits.

Constant Summary

Constants included from Util

Util::EXPIRY

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Util

#fire_event, #hostname, #identity, #logger, #process_nonce, #safe_thread, #watchdog

Methods included from ExceptionHandler

#handle_exception

Constructor Details

#initialize(mgr, subscriber) ⇒ Listener

Returns a new instance of Listener.



30
31
32
33
34
35
36
37
38
39
# File 'lib/subserver/listener.rb', line 30

def initialize(mgr, subscriber)
  @mgr = mgr
  @valid = true
  @done = false
  @thread = nil
  @reloader = Subserver.options[:reloader]
  @subscriber = subscriber
  @subscription = retrieve_subscription
  @logging = (mgr.options[:message_logger] || Subserver::MessageLogger).new
end

Instance Attribute Details

#subscriberObject (readonly)

Returns the value of attribute subscriber.



28
29
30
# File 'lib/subserver/listener.rb', line 28

def subscriber
  @subscriber
end

#threadObject (readonly)

Returns the value of attribute thread.



27
28
29
# File 'lib/subserver/listener.rb', line 27

def thread
  @thread
end

Instance Method Details

#connect_subscriberObject



82
83
84
85
86
87
88
89
# File 'lib/subserver/listener.rb', line 82

def connect_subscriber
  options = @subscriber.get_subserver_options
  logger.debug("Connecting to subscription with options: #{options}")
  @pubsub_listener = @subscription.listen streams: options[:streams], threads: options[:threads] do |received_message|
    logger.debug("Message Received: #{received_message}")
    process_message(received_message)
  end
end

#execute(subscriber, received_message) ⇒ Object



124
125
126
# File 'lib/subserver/listener.rb', line 124

def execute(subscriber, received_message)
  subscriber.new.perform(received_message)
end

#killObject



54
55
56
57
58
59
60
# File 'lib/subserver/listener.rb', line 54

def kill
  @done = true
  return if !@thread
  # Hard stop the listener and shutdown thread after timeout passes.
  @pubsub_listener.stop
  @thread.raise ::Subserver::Shutdown
end

#nameObject



41
42
43
# File 'lib/subserver/listener.rb', line 41

def name
  @subscriber.name
end

#process_message(received_message) ⇒ Object



104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# File 'lib/subserver/listener.rb', line 104

def process_message(received_message)
  begin
    logger.debug("Executing Middleware")
    @reloader.call do
      Subserver.middleware.invoke(@subscriber, received_message) do
        execute(@subscriber, received_message)
      end
    end
  rescue Subserver::Shutdown
    # Reject message if shutdown
    received_message.reject!
  rescue StandardError => error
    handle_exception error, {
      context: 'Exception raised during message processing.',
      message: received_message
    }
    raise
  end
end

#retrieve_subscriptionObject



72
73
74
75
76
77
78
79
80
# File 'lib/subserver/listener.rb', line 72

def retrieve_subscription
  subscription_name = @subscriber.get_subserver_options[:subscription]
  subscription = Pubsub.client.subscription subscription_name rescue nil
  if subscription.nil?
    logger.error "ArgumentError: Invalid Subscription name: #{subscription_name} in subscriber #{@subscriber.name}. Please ensure your Pubsub subscription exists."
    @valid = false
  end
  subscription
end

#runObject



91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/subserver/listener.rb', line 91

def run
  begin
    # This begins the listener process in a forked thread
    fire_event(:listener_startup, reverse: false, reraise: true)
    connect_subscriber
    @pubsub_listener.start
  rescue Subserver::Shutdown
    @mgr.listener_stopped(self)
  rescue Exception => ex
    @mgr.listener_died(self, @subscriber, ex)
  end
end

#startObject



62
63
64
# File 'lib/subserver/listener.rb', line 62

def start
  @thread ||= safe_thread("listener", &method(:run))
end

#stopObject



45
46
47
48
49
50
51
52
# File 'lib/subserver/listener.rb', line 45

def stop
  @done = true
  return if !@thread
  
  # Stop the listener and wait for current messages to finish processing.
  @pubsub_listener.stop.wait!
  @mgr.listener_stopped(self)
end

#valid?Boolean

Returns:

  • (Boolean)


66
67
68
# File 'lib/subserver/listener.rb', line 66

def valid?
  @valid
end