Class: Google::Cloud::PubSub::Subscriber
- Inherits:
-
Object
- Object
- Google::Cloud::PubSub::Subscriber
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscriber/inventory.rb,
lib/google/cloud/pubsub/subscriber/sequencer.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb,
lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb
Overview
Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::PubSub::Subscription#listen
Instance Attribute Summary collapse
-
#callback ⇒ Proc
readonly
The procedure that will handle the messages received from the subscription.
-
#callback_threads ⇒ Integer
readonly
The number of threads used to handle the received messages.
-
#deadline ⇒ Numeric
readonly
The default number of seconds the stream will hold received messages before modifying the message's ack deadline.
-
#message_ordering ⇒ Boolean
readonly
Whether message ordering has been enabled.
-
#push_threads ⇒ Integer
readonly
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#modify_ack_deadline!).
-
#streams ⇒ Integer
readonly
The number of concurrent streams to open to pull messages from the subscription.
-
#subscription_name ⇒ String
readonly
The name of the subscription the messages are pulled from.
Instance Method Summary collapse
-
#last_error ⇒ Exception?
The most recent unhandled error to occur while listening to messages on the subscriber.
-
#max_duration_per_lease_extension ⇒ Integer
The maximum amount of time in seconds for a single lease extension attempt.
-
#max_outstanding_bytes ⇒ Integer
(also: #inventory_bytesize)
The total byte size of received messages to be collected by subscriber.
-
#max_outstanding_messages ⇒ Integer
(also: #inventory_limit, #inventory)
The number of received messages to be collected by subscriber.
-
#max_total_lease_duration ⇒ Integer
(also: #inventory_extension)
The number of seconds that received messages can be held awaiting processing.
-
#on_error {|callback| ... } ⇒ Object
Register to be notified of errors when raised.
-
#start ⇒ Subscriber
Starts the subscriber pulling from the subscription and processing the received messages.
-
#started? ⇒ boolean
Whether the subscriber has been started.
-
#stop ⇒ Subscriber
Immediately stops the subscriber.
-
#stop!(timeout = nil) ⇒ Subscriber
Stop this subscriber and block until the subscriber is fully stopped and all received messages have been processed or released, or until
timeoutseconds have passed. -
#stopped? ⇒ boolean
Whether the subscriber has been stopped.
-
#use_legacy_flow_control? ⇒ Boolean
Whether to enforce flow control at the client side only or to enforce it at both the client and the server.
-
#wait!(timeout = nil) ⇒ Subscriber
Blocks until the subscriber is fully stopped and all received messages have been processed or released, or until
timeoutseconds have passed.
Instance Attribute Details
#callback ⇒ Proc (readonly)
The procedure that will handle the messages received from the subscription.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def callback @callback end |
#callback_threads ⇒ Integer (readonly)
The number of threads used to handle the received messages. Default is 8.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def callback_threads @callback_threads end |
#deadline ⇒ Numeric (readonly)
The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def deadline @deadline end |
#message_ordering ⇒ Boolean (readonly)
Whether message ordering has been enabled.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def @message_ordering end |
#push_threads ⇒ Integer (readonly)
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#modify_ack_deadline!). Default is 4.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def push_threads @push_threads end |
#streams ⇒ Integer (readonly)
The number of concurrent streams to open to pull messages from the subscription. Default is 4.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def streams @streams end |
#subscription_name ⇒ String (readonly)
The name of the subscription the messages are pulled from.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def subscription_name @subscription_name end |
Instance Method Details
#last_error ⇒ Exception?
The most recent unhandled error to occur while listening to messages on the subscriber.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
286 287 288 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 286 def last_error synchronize { @last_error } end |
#max_duration_per_lease_extension ⇒ Integer
The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).
342 343 344 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 342 def max_duration_per_lease_extension @inventory[:max_duration_per_lease_extension] end |
#max_outstanding_bytes ⇒ Integer Also known as: inventory_bytesize
The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).
308 309 310 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 308 def max_outstanding_bytes @inventory[:max_outstanding_bytes] end |
#max_outstanding_messages ⇒ Integer Also known as: inventory_limit, inventory
The number of received messages to be collected by subscriber. Default is 1,000.
295 296 297 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 295 def @inventory[:max_outstanding_messages] end |
#max_total_lease_duration ⇒ Integer Also known as: inventory_extension
The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).
330 331 332 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 330 def max_total_lease_duration @inventory[:max_total_lease_duration] end |
#on_error {|callback| ... } ⇒ Object
Register to be notified of errors when raised.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
Multiple error handlers can be added.
250 251 252 253 254 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 250 def on_error &block synchronize do @error_callbacks << block end end |
#start ⇒ Subscriber
Starts the subscriber pulling from the subscription and processing the received messages.
113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 113 def start start_pool = synchronize do @started = true @stopped = false # Start the buffer before the streams are all started @buffer.start @stream_pool.map do |stream| Thread.new { stream.start } end end start_pool.map(&:join) self end |
#started? ⇒ boolean
Whether the subscriber has been started.
202 203 204 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 202 def started? synchronize { @started } end |
#stop ⇒ Subscriber
Immediately stops the subscriber. No new messages will be pulled from the subscription. All actions taken on received messages that have not yet been sent to the API will be sent to the API. All received but unprocessed messages will be released back to the API and redelivered. Use #wait! to block until the subscriber is fully stopped and all received messages have been processed or released.
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 139 def stop stop_pool = synchronize do @started = false @stopped = true @stream_pool.map do |stream| Thread.new { stream.stop } end end stop_pool.map(&:join) # Stop the buffer after the streams are all stopped synchronize { @buffer.stop } self end |
#stop!(timeout = nil) ⇒ Subscriber
192 193 194 195 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 192 def stop! timeout = nil stop wait! timeout end |
#stopped? ⇒ boolean
Whether the subscriber has been stopped.
211 212 213 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 211 def stopped? synchronize { @stopped } end |
#use_legacy_flow_control? ⇒ Boolean
Whether to enforce flow control at the client side only or to enforce it at both the client and the server. For more details about flow control see https://cloud.google.com/pubsub/docs/pull#config.
server side flow control are enforced.
321 322 323 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 321 def use_legacy_flow_control? @inventory[:use_legacy_flow_control] end |
#wait!(timeout = nil) ⇒ Subscriber
169 170 171 172 173 174 175 176 177 178 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 169 def wait! timeout = nil wait_pool = synchronize do @stream_pool.map do |stream| Thread.new { stream.wait! timeout } end end wait_pool.map(&:join) self end |