Class: Subserver::Listener
- Inherits:
-
Object
- Object
- Subserver::Listener
show all
- Includes:
- Util
- Defined in:
- lib/subserver/listener.rb
Overview
The Listener is a standalone thread which:
-
Starts Google Pubsub subscription threads which:
a. Instantiate the Subscription class
b. Run the middleware chain
c. call subscriber #perform
A Listener can exit due to shutdown (listner_stopped) or due to an error during message processing (listener_died)
If an error occurs during message processing, the Listener calls the Manager to create a new one to replace itself and exits.
Constant Summary
Constants included
from Util
Util::EXPIRY
Instance Attribute Summary collapse
Instance Method Summary
collapse
Methods included from Util
#fire_event, #hostname, #identity, #logger, #process_nonce, #safe_thread, #watchdog
#handle_exception
Constructor Details
#initialize(mgr, subscriber) ⇒ Listener
Returns a new instance of Listener.
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/subserver/listener.rb', line 30
def initialize(mgr, subscriber)
@mgr = mgr
@valid = true
@done = false
@thread = nil
@reloader = Subserver.options[:reloader]
@subscriber = subscriber
@subscription = retrieve_subscription
@logging = (mgr.options[:message_logger] || Subserver::MessageLogger).new
end
|
Instance Attribute Details
#subscriber ⇒ Object
Returns the value of attribute subscriber.
28
29
30
|
# File 'lib/subserver/listener.rb', line 28
def subscriber
@subscriber
end
|
#thread ⇒ Object
Returns the value of attribute thread.
27
28
29
|
# File 'lib/subserver/listener.rb', line 27
def thread
@thread
end
|
Instance Method Details
#connect_subscriber ⇒ Object
82
83
84
85
86
87
88
89
|
# File 'lib/subserver/listener.rb', line 82
def connect_subscriber
options = @subscriber.get_subserver_options
logger.debug("Connecting to subscription with options: #{options}")
@pubsub_listener = @subscription.listen streams: options[:streams], threads: options[:threads] do |received_message|
logger.debug("Message Received: #{received_message}")
process_message(received_message)
end
end
|
#execute(subscriber, received_message) ⇒ Object
124
125
126
|
# File 'lib/subserver/listener.rb', line 124
def execute(subscriber, received_message)
subscriber.new.perform(received_message)
end
|
#kill ⇒ Object
54
55
56
57
58
59
60
|
# File 'lib/subserver/listener.rb', line 54
def kill
@done = true
return if !@thread
@pubsub_listener.stop
@thread.raise ::Subserver::Shutdown
end
|
#name ⇒ Object
41
42
43
|
# File 'lib/subserver/listener.rb', line 41
def name
@subscriber.name
end
|
#process_message(received_message) ⇒ Object
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/subserver/listener.rb', line 104
def process_message(received_message)
begin
logger.debug("Executing Middleware")
@reloader.call do
Subserver.middleware.invoke(@subscriber, received_message) do
execute(@subscriber, received_message)
end
end
rescue Subserver::Shutdown
received_message.reject!
rescue StandardError => error
handle_exception error, {
context: 'Exception raised during message processing.',
message: received_message
}
raise
end
end
|
#retrieve_subscription ⇒ Object
72
73
74
75
76
77
78
79
80
|
# File 'lib/subserver/listener.rb', line 72
def retrieve_subscription
subscription_name = @subscriber.get_subserver_options[:subscription]
subscription = Pubsub.client.subscription subscription_name rescue nil
if subscription.nil?
logger.error "ArgumentError: Invalid Subscription name: #{subscription_name} in subscriber #{@subscriber.name}. Please ensure your Pubsub subscription exists."
@valid = false
end
subscription
end
|
#run ⇒ Object
91
92
93
94
95
96
97
98
99
100
101
102
|
# File 'lib/subserver/listener.rb', line 91
def run
begin
fire_event(:listener_startup, reverse: false, reraise: true)
connect_subscriber
@pubsub_listener.start
rescue Subserver::Shutdown
@mgr.listener_stopped(self)
rescue Exception => ex
@mgr.listener_died(self, @subscriber, ex)
end
end
|
#start ⇒ Object
62
63
64
|
# File 'lib/subserver/listener.rb', line 62
def start
@thread ||= safe_thread("listener", &method(:run))
end
|
#stop ⇒ Object
45
46
47
48
49
50
51
52
|
# File 'lib/subserver/listener.rb', line 45
def stop
@done = true
return if !@thread
@pubsub_listener.stop.wait!
@mgr.listener_stopped(self)
end
|
#valid? ⇒ Boolean
66
67
68
|
# File 'lib/subserver/listener.rb', line 66
def valid?
@valid
end
|