Class: Gcloud::Pubsub::Subscription
- Inherits:
-
Object
- Object
- Gcloud::Pubsub::Subscription
- Defined in:
- lib/gcloud/pubsub/subscription.rb,
lib/gcloud/pubsub/subscription/list.rb
Overview
Defined Under Namespace
Classes: List
Instance Attribute Summary collapse
-
#connection ⇒ Object
The Connection object.
-
#gapi ⇒ Object
The Google API Client object.
Class Method Summary collapse
-
.from_gapi(gapi, conn) ⇒ Object
New Subscription from a Google API Client object.
-
.new_lazy(name, conn, options = {}) ⇒ Object
New lazy Topic object without making an HTTP request.
Instance Method Summary collapse
-
#acknowledge(*messages) ⇒ Object
(also: #ack)
Acknowledges receipt of a message.
-
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
-
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
-
#delete ⇒ Object
Deletes an existing subscription.
-
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
-
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
-
#exists? ⇒ Boolean
Determines whether the subscription exists in the Pub/Sub service.
-
#initialize ⇒ Subscription
constructor
Create an empty Subscription object.
-
#lazy? ⇒ Boolean
Determines whether the subscription object was created with an HTTP call.
-
#listen(options = {}) ⇒ Object
Poll the backend for new messages.
-
#name ⇒ Object
The name of the subscription.
-
#policy(options = {}) ⇒ Object
Gets the access control policy.
-
#policy=(new_policy) ⇒ Object
Sets the access control policy.
-
#pull(options = {}) ⇒ Object
Pulls messages from the server.
-
#topic ⇒ Object
The Topic from which this subscription receives messages.
-
#wait_for_messages(options = {}) ⇒ Object
Pulls from the server while waiting for messages to become available.
Constructor Details
#initialize ⇒ Subscription
Create an empty Subscription object.
48 49 50 51 52 53 |
# File 'lib/gcloud/pubsub/subscription.rb', line 48 def initialize #:nodoc: @connection = nil @gapi = {} @name = nil @exists = nil end |
Instance Attribute Details
#connection ⇒ Object
The Connection object.
40 41 42 |
# File 'lib/gcloud/pubsub/subscription.rb', line 40 def connection @connection end |
#gapi ⇒ Object
The Google API Client object.
44 45 46 |
# File 'lib/gcloud/pubsub/subscription.rb', line 44 def gapi @gapi end |
Class Method Details
.from_gapi(gapi, conn) ⇒ Object
New Subscription from a Google API Client object.
567 568 569 570 571 572 |
# File 'lib/gcloud/pubsub/subscription.rb', line 567 def self.from_gapi gapi, conn #:nodoc: new.tap do |f| f.gapi = gapi f.connection = conn end end |
.new_lazy(name, conn, options = {}) ⇒ Object
New lazy Topic object without making an HTTP request.
57 58 59 60 61 62 63 64 65 66 |
# File 'lib/gcloud/pubsub/subscription.rb', line 57 def self.new_lazy name, conn, = {} #:nodoc: sub = new.tap do |f| f.gapi = nil f.connection = conn end sub.instance_eval do @name = conn.subscription_path(name, ) end sub end |
Instance Method Details
#acknowledge(*messages) ⇒ Object Also known as: ack
Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.
Parameters
messages
-
One or more ReceivedMessage objects or ack_id values. (
ReceivedMessage
orack_id
)
Example
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
= sub.pull
sub.acknowledge
408 409 410 411 412 413 414 415 416 417 |
# File 'lib/gcloud/pubsub/subscription.rb', line 408 def acknowledge * ack_ids = coerce_ack_ids ensure_connection! resp = connection.acknowledge name, *ack_ids if resp.success? true else fail ApiError.from_response(resp) end end |
#deadline ⇒ Object
This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.
101 102 103 104 |
# File 'lib/gcloud/pubsub/subscription.rb', line 101 def deadline ensure_gapi! @gapi["ackDeadlineSeconds"] end |
#delay(new_deadline, *messages) ⇒ Object
Modifies the acknowledge deadline for messages.
This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.
Parameters
new_deadline
-
The new ack deadline in seconds from the time this request is sent to the Pub/Sub system. Must be >= 0. For example, if the value is
10
, the new ack deadline will expire 10 seconds after the call is made. Specifying0
may immediately make the messages available for another pull request. (Integer
) messages
-
One or more ReceivedMessage objects or ack_id values. (
ReceivedMessage
orack_id
)
Example
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
= sub.pull
sub.delay 120,
450 451 452 453 454 455 456 457 458 459 |
# File 'lib/gcloud/pubsub/subscription.rb', line 450 def delay new_deadline, * ack_ids = coerce_ack_ids ensure_connection! resp = connection.modify_ack_deadline name, ack_ids, new_deadline if resp.success? true else fail ApiError.from_response(resp) end end |
#delete ⇒ Object
186 187 188 189 190 191 192 193 194 |
# File 'lib/gcloud/pubsub/subscription.rb', line 186 def delete ensure_connection! resp = connection.delete_subscription name if resp.success? true else fail ApiError.from_response(resp) end end |
#endpoint ⇒ Object
Returns the URL locating the endpoint to which messages should be pushed.
109 110 111 112 |
# File 'lib/gcloud/pubsub/subscription.rb', line 109 def endpoint ensure_gapi! @gapi["pushConfig"]["pushEndpoint"] if @gapi["pushConfig"] end |
#endpoint=(new_endpoint) ⇒ Object
Sets the URL locating the endpoint to which messages should be pushed.
116 117 118 119 120 121 122 123 124 |
# File 'lib/gcloud/pubsub/subscription.rb', line 116 def endpoint= new_endpoint ensure_connection! resp = connection.modify_push_config name, new_endpoint, {} if resp.success? @gapi["pushConfig"]["pushEndpoint"] = new_endpoint if @gapi else fail ApiError.from_response(resp) end end |
#exists? ⇒ Boolean
139 140 141 142 143 144 145 146 147 148 |
# File 'lib/gcloud/pubsub/subscription.rb', line 139 def exists? # Always true if we have a gapi object return true unless @gapi.nil? # If we have a value, return it return @exists unless @exists.nil? ensure_gapi! @exists = !@gapi.nil? rescue NotFoundError @exists = false end |
#lazy? ⇒ Boolean
164 165 166 |
# File 'lib/gcloud/pubsub/subscription.rb', line 164 def lazy? #:nodoc: @gapi.nil? end |
#listen(options = {}) ⇒ Object
Poll the backend for new messages. This runs a loop to ping the API, blocking indefinitely, yielding retrieved messages as they are received.
Parameters
options
-
An optional Hash for controlling additional behavior. (
Hash
) options[:max]
-
The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is
100
, the maximum value is1000
. (Integer
) options[:autoack]
-
Automatically acknowledge the message as it is pulled. The default value is
false
. (Boolean
) options[:delay]
-
The number of seconds to pause between requests when the Google Cloud service has no messages to return. The default value is
1
. (Number
)
Examples
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
sub.listen do |msg|
# process msg
end
The number of messages pulled per batch can be set with the max
option:
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
sub.listen max: 20 do |msg|
# process msg
end
Messages can be automatically acknowledged as they are pulled with the autoack
option:
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
sub.listen autoack: true do |msg|
# process msg
end
371 372 373 374 375 376 377 378 379 380 381 |
# File 'lib/gcloud/pubsub/subscription.rb', line 371 def listen = {} delay = .fetch(:delay, 1) loop do msgs = if msgs.any? msgs.each { |msg| yield msg } else sleep delay end end end |
#name ⇒ Object
The name of the subscription.
70 71 72 |
# File 'lib/gcloud/pubsub/subscription.rb', line 70 def name @gapi ? @gapi["name"] : @name end |
#policy(options = {}) ⇒ Object
Gets the access control policy.
Parameters
options
-
An optional Hash for controlling additional behavior. (
Hash
) options[:force]
-
Force the latest policy to be retrieved from the Pub/Sub service when +true. Otherwise the policy will be memoized to reduce the number of API calls made to the Pub/Sub service. The default is
false
. (Boolean
)
Returns
A hash that conforms to the following structure:
{
"bindings" => [{
"role" => "roles/viewer",
"members" => ["serviceAccount:your-service-account"]
}],
"rules" => []
}
Examples
By default, the policy values are memoized to reduce the number of API calls to the Pub/Sub service.
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
subscription = pubsub.subscription "my-subscription"
puts subscription.policy["bindings"]
puts subscription.policy["rules"]
To retrieve the latest policy from the Pub/Sub service, use the force
flag.
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
subscription = pubsub.subscription "my-subscription"
policy = subscription.policy force: true
511 512 513 514 515 516 517 518 519 520 |
# File 'lib/gcloud/pubsub/subscription.rb', line 511 def policy = {} @policy = nil if [:force] @policy ||= begin ensure_connection! resp = connection.get_subscription_policy name policy = resp.data["policy"] policy = policy.to_hash if policy.respond_to? :to_hash policy end end |
#policy=(new_policy) ⇒ Object
Sets the access control policy.
Parameters
new_policy
-
A hash that conforms to the following structure:
{ "bindings" => [{ "role" => "roles/viewer", "members" => ["serviceAccount:your-service-account"] }], "rules" => [] }
Example
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
viewer_policy = {
"bindings" => [{
"role" => "roles/viewer",
"members" => ["serviceAccount:your-service-account"]
}]
}
subscription = pubsub.subscription "my-subscription"
subscription.policy = viewer_policy
554 555 556 557 558 559 560 561 562 563 |
# File 'lib/gcloud/pubsub/subscription.rb', line 554 def policy= new_policy ensure_connection! resp = connection.set_subscription_policy name, new_policy if resp.success? @policy = resp.data["policy"] @policy = @policy.to_hash if @policy.respond_to? :to_hash else fail ApiError.from_response(resp) end end |
#pull(options = {}) ⇒ Object
Pulls messages from the server. Returns an empty list if there are no messages available in the backlog. Raises an ApiError with status UNAVAILABLE
if there are too many concurrent pull requests pending for the given subscription.
Parameters
options
-
An optional Hash for controlling additional behavior. (
Hash
) options[:immediate]
-
When
true
the system will respond immediately even if it is not able to return messages. Whenfalse
the system is allowed to wait until it can return least one message. No messages are returned when a request times out. The default value istrue
. (Boolean
) options[:max]
-
The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is
100
, the maximum value is1000
. (Integer
) options[:autoack]
-
Automatically acknowledge the message as it is pulled. The default value is
false
. (Boolean
)
Returns
Array of Gcloud::Pubsub::ReceivedMessage
Examples
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
sub.pull.each { |msg| msg.acknowledge! }
A maximum number of messages returned can also be specified:
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub", max: 10
sub.pull.each { |msg| msg.acknowledge! }
The call can block until messages are available by setting the :immediate
option to false
:
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
msgs = sub.pull immediate: false
msgs.each { |msg| msg.acknowledge! }
258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 |
# File 'lib/gcloud/pubsub/subscription.rb', line 258 def pull = {} ensure_connection! resp = connection.pull name, if resp.success? = Array(resp.data["receivedMessages"]).map do |gapi| ReceivedMessage.from_gapi gapi, self end acknowledge if [:autoack] else fail ApiError.from_response(resp) end rescue Faraday::TimeoutError [] end |
#topic ⇒ Object
91 92 93 94 95 96 |
# File 'lib/gcloud/pubsub/subscription.rb', line 91 def topic ensure_gapi! # Always disable autocreate, we don't want to recreate a topic that # was intentionally deleted. Topic.new_lazy @gapi["topic"], connection, autocreate: false end |
#wait_for_messages(options = {}) ⇒ Object
Pulls from the server while waiting for messages to become available. This is the same as:
subscription.pull immediate: false
Parameters
options
-
An optional Hash for controlling additional behavior. (
Hash
) options[:max]
-
The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is
100
, the maximum value is1000
. (Integer
) options[:autoack]
-
Automatically acknowledge the message as it is pulled. The default value is
false
. (Boolean
)
Returns
Array of Gcloud::Pubsub::ReceivedMessage
Example
require "gcloud"
gcloud = Gcloud.new
pubsub = gcloud.pubsub
sub = pubsub.subscription "my-topic-sub"
msgs = sub.
msgs.each { |msg| msg.acknowledge! }
309 310 311 |
# File 'lib/gcloud/pubsub/subscription.rb', line 309 def = {} pull .merge(immediate: false) end |