Class: Google::Cloud::PubSub::Topic
- Inherits:
-
Object
- Object
- Google::Cloud::PubSub::Topic
- Defined in:
- lib/google/cloud/pubsub/topic.rb,
lib/google/cloud/pubsub/topic/list.rb
Overview
Defined Under Namespace
Classes: List
Instance Method Summary collapse
-
#async_publisher ⇒ AsyncPublisher
AsyncPublisher object used to publish multiple messages in batches.
-
#delete ⇒ Boolean
Permanently deletes the topic.
-
#enable_message_ordering! ⇒ Object
Enables message ordering for messages with ordering keys on the #async_publisher.
-
#exists? ⇒ Boolean
Determines whether the topic exists in the Pub/Sub service.
-
#kms_key ⇒ String
The Cloud KMS encryption key that will be used to protect access to messages published on this topic.
-
#kms_key=(new_kms_key_name) ⇒ Object
Set the Cloud KMS encryption key that will be used to protect access to messages published on this topic.
-
#labels ⇒ Hash
A hash of user-provided labels associated with this topic.
-
#labels=(new_labels) ⇒ Object
Sets the hash of user-provided labels associated with this topic.
-
#message_encoding ⇒ Symbol?
The encoding of messages validated against the schema identified by #schema_name.
-
#message_encoding_binary? ⇒ Boolean
Checks if the encoding of messages in the schema settings is
BINARY. -
#message_encoding_json? ⇒ Boolean
Checks if the encoding of messages in the schema settings is
JSON. -
#message_ordering? ⇒ Boolean
Whether message ordering for messages with ordering keys has been enabled on the #async_publisher.
-
#name ⇒ String
The name of the topic.
-
#persistence_regions ⇒ Array<String>
The list of GCP region IDs where messages that are published to the topic may be persisted in storage.
-
#persistence_regions=(new_persistence_regions) ⇒ Object
Sets the list of GCP region IDs where messages that are published to the topic may be persisted in storage.
-
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this topic.
-
#publish(data = nil, attributes = {}) {|batch| ... } ⇒ Message+
Publishes one or more messages to the topic.
-
#publish_async(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object
Publishes a message asynchronously to the topic using #async_publisher.
-
#reference? ⇒ Boolean
Determines whether the topic object was created without retrieving the resource representation from the Pub/Sub service.
-
#reload! ⇒ Google::Cloud::PubSub::Topic
(also: #refresh!)
Reloads the topic with current data from the Pub/Sub service.
-
#resource? ⇒ Boolean
Determines whether the topic object was created with a resource representation from the Pub/Sub service.
-
#resume_publish(ordering_key) ⇒ boolean
Resume publishing ordered messages for the provided ordering key.
-
#schema_name ⇒ String?
The name of the schema that messages published should be validated against, if schema settings are configured for the topic.
-
#subscribe(subscription_name, deadline: nil, retain_acked: false, retention: nil, endpoint: nil, push_config: nil, labels: nil, message_ordering: nil, filter: nil, dead_letter_topic: nil, dead_letter_max_delivery_attempts: nil, retry_policy: nil) ⇒ Google::Cloud::PubSub::Subscription
(also: #create_subscription, #new_subscription)
Creates a new Subscription object on the current Topic.
-
#subscription(subscription_name, skip_lookup: nil) ⇒ Google::Cloud::PubSub::Subscription?
(also: #get_subscription, #find_subscription)
Retrieves subscription by name.
-
#subscriptions(token: nil, max: nil) ⇒ Array<Subscription>
(also: #find_subscriptions, #list_subscriptions)
Retrieves a list of subscription names for the given project.
-
#test_permissions(*permissions) ⇒ Array<Strings>
Tests the specified permissions against the Cloud IAM access control policy.
-
#update_policy(new_policy) ⇒ Policy
(also: #policy=)
Updates the Cloud IAM access control policy for this topic.
Instance Method Details
#async_publisher ⇒ AsyncPublisher
AsyncPublisher object used to publish multiple messages in batches.
83 84 85 |
# File 'lib/google/cloud/pubsub/topic.rb', line 83 def async_publisher @async_publisher end |
#delete ⇒ Boolean
Permanently deletes the topic.
321 322 323 324 325 |
# File 'lib/google/cloud/pubsub/topic.rb', line 321 def delete ensure_service! service.delete_topic name true end |
#enable_message_ordering! ⇒ Object
At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.
Enables message ordering for messages with ordering keys on the
#async_publisher. When enabled, messages published with the same
ordering_key will be delivered in the order they were published.
See #message_ordering?. See #publish_async, Subscription#listen, and Message#ordering_key.
767 768 769 770 |
# File 'lib/google/cloud/pubsub/topic.rb', line 767 def @async_publisher ||= AsyncPublisher.new name, service, **@async_opts @async_publisher. end |
#exists? ⇒ Boolean
Determines whether the topic exists in the Pub/Sub service.
932 933 934 935 936 937 938 939 940 941 |
# File 'lib/google/cloud/pubsub/topic.rb', line 932 def exists? # Always true if the object is not set as reference return true unless reference? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = true rescue Google::Cloud::NotFoundError @exists = false end |
#kms_key ⇒ String
The Cloud KMS encryption key that will be used to protect access
to messages published on this topic.
For example: projects/a/locations/b/keyRings/c/cryptoKeys/d
The default value is nil, which means default encryption is used.
Makes an API call to retrieve the KMS encryption key when called on a reference object. See #reference?.
155 156 157 158 |
# File 'lib/google/cloud/pubsub/topic.rb', line 155 def kms_key ensure_grpc! @grpc.kms_key_name end |
#kms_key=(new_kms_key_name) ⇒ Object
Set the Cloud KMS encryption key that will be used to protect access
to messages published on this topic.
For example: projects/a/locations/b/keyRings/c/cryptoKeys/d
The default value is nil, which means default encryption is used.
178 179 180 181 182 |
# File 'lib/google/cloud/pubsub/topic.rb', line 178 def kms_key= new_kms_key_name update_grpc = Google::Cloud::PubSub::V1::Topic.new name: name, kms_key_name: new_kms_key_name @grpc = service.update_topic update_grpc, :kms_key_name @resource_name = nil end |
#labels ⇒ Hash
A hash of user-provided labels associated with this topic. Labels can be used to organize and group topics. See Creating and Managing Labels.
The returned hash is frozen and changes are not allowed. Use #labels= to update the labels for this topic.
Makes an API call to retrieve the labels values when called on a reference object. See #reference?.
111 112 113 114 |
# File 'lib/google/cloud/pubsub/topic.rb', line 111 def labels ensure_grpc! @grpc.labels.to_h.freeze end |
#labels=(new_labels) ⇒ Object
Sets the hash of user-provided labels associated with this topic. Labels can be used to organize and group topics. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.
128 129 130 131 132 133 |
# File 'lib/google/cloud/pubsub/topic.rb', line 128 def labels= new_labels raise ArgumentError, "Value must be a Hash" if new_labels.nil? update_grpc = Google::Cloud::PubSub::V1::Topic.new name: name, labels: new_labels @grpc = service.update_topic update_grpc, :labels @resource_name = nil end |
#message_encoding ⇒ Symbol?
The encoding of messages validated against the schema identified by #schema_name. If present, #schema_name should also be present. Values include:
JSON- JSON encoding.BINARY- Binary encoding, as defined by the schema type. For some schema types, binary encoding may not be available.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
281 282 283 284 |
# File 'lib/google/cloud/pubsub/topic.rb', line 281 def ensure_grpc! @grpc.schema_settings&.encoding end |
#message_encoding_binary? ⇒ Boolean
Checks if the encoding of messages in the schema settings is BINARY. See #message_encoding.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
293 294 295 |
# File 'lib/google/cloud/pubsub/topic.rb', line 293 def .to_s.upcase == "BINARY" end |
#message_encoding_json? ⇒ Boolean
Checks if the encoding of messages in the schema settings is JSON. See #message_encoding.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
304 305 306 |
# File 'lib/google/cloud/pubsub/topic.rb', line 304 def .to_s.upcase == "JSON" end |
#message_ordering? ⇒ Boolean
Whether message ordering for messages with ordering keys has been
enabled on the #async_publisher. When enabled, messages published
with the same ordering_key will be delivered in the order they were
published. When disabled, messages may be delivered in any order.
See #enable_message_ordering!. See #publish_async, Subscription#listen, and Message#ordering_key.
783 784 785 786 |
# File 'lib/google/cloud/pubsub/topic.rb', line 783 def @async_publisher ||= AsyncPublisher.new name, service, **@async_opts @async_publisher. end |
#name ⇒ String
The name of the topic.
93 94 95 96 |
# File 'lib/google/cloud/pubsub/topic.rb', line 93 def name return @resource_name if reference? @grpc.name end |
#persistence_regions ⇒ Array<String>
The list of GCP region IDs where messages that are published to the topic may be persisted in storage.
Messages published by publishers running in non-allowed GCP regions (or running outside of GCP altogether) will be routed for storage in one of the allowed regions. An empty list indicates a misconfiguration at the project or organization level, which will result in all publish operations failing.
Makes an API call to retrieve the list of GCP region IDs values when called on a reference object. See #reference?.
208 209 210 211 212 |
# File 'lib/google/cloud/pubsub/topic.rb', line 208 def persistence_regions ensure_grpc! return [] if @grpc..nil? Array @grpc..allowed_persistence_regions end |
#persistence_regions=(new_persistence_regions) ⇒ Object
Sets the list of GCP region IDs where messages that are published to the topic may be persisted in storage.
229 230 231 232 233 234 |
# File 'lib/google/cloud/pubsub/topic.rb', line 229 def persistence_regions= new_persistence_regions update_grpc = Google::Cloud::PubSub::V1::Topic.new \ name: name, message_storage_policy: { allowed_persistence_regions: Array(new_persistence_regions) } @grpc = service.update_topic update_grpc, :message_storage_policy @resource_name = nil end |
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this topic.
835 836 837 838 839 840 841 842 |
# File 'lib/google/cloud/pubsub/topic.rb', line 835 def policy ensure_service! grpc = service.get_topic_policy name policy = Policy.from_grpc grpc return policy unless block_given? yield policy update_policy policy end |
#publish(data = nil, attributes = {}) {|batch| ... } ⇒ Message+
Publishes one or more messages to the topic.
The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.
639 640 641 642 643 644 645 |
# File 'lib/google/cloud/pubsub/topic.rb', line 639 def publish data = nil, attributes = {} ensure_service! batch = BatchPublisher.new data, attributes yield batch if block_given? return nil if batch..count.zero? batch end |
#publish_async(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object
At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.
Publishes a message asynchronously to the topic using #async_publisher.
The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.
Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.
To use ordering keys, specify ordering_key. Before specifying
ordering_key on a message a call to #enable_message_ordering! must
be made or an error will be raised.
749 750 751 752 753 754 |
# File 'lib/google/cloud/pubsub/topic.rb', line 749 def publish_async data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback ensure_service! @async_publisher ||= AsyncPublisher.new name, service, **@async_opts @async_publisher.publish data, attributes, ordering_key: ordering_key, **extra_attrs, &callback end |
#reference? ⇒ Boolean
Determines whether the topic object was created without retrieving the resource representation from the Pub/Sub service.
958 959 960 |
# File 'lib/google/cloud/pubsub/topic.rb', line 958 def reference? @grpc.nil? end |
#reload! ⇒ Google::Cloud::PubSub::Topic Also known as: refresh!
Reloads the topic with current data from the Pub/Sub service.
994 995 996 997 998 999 |
# File 'lib/google/cloud/pubsub/topic.rb', line 994 def reload! ensure_service! @grpc = service.get_topic name @resource_name = nil self end |
#resource? ⇒ Boolean
Determines whether the topic object was created with a resource representation from the Pub/Sub service.
977 978 979 |
# File 'lib/google/cloud/pubsub/topic.rb', line 977 def resource? !@grpc.nil? end |
#resume_publish(ordering_key) ⇒ boolean
Resume publishing ordered messages for the provided ordering key.
796 797 798 799 |
# File 'lib/google/cloud/pubsub/topic.rb', line 796 def resume_publish ordering_key @async_publisher ||= AsyncPublisher.new name, service, **@async_opts @async_publisher.resume_publish ordering_key end |
#schema_name ⇒ String?
The name of the schema that messages published should be validated against, if schema settings are configured
for the topic. The value is a fully-qualified schema name in the form
projects/{project_id}/schemas/{schema_id}. If present, #message_encoding should also be present. The value
of this field will be _deleted-schema_ if the schema has been deleted.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
255 256 257 258 |
# File 'lib/google/cloud/pubsub/topic.rb', line 255 def schema_name ensure_grpc! @grpc.schema_settings&.schema end |
#subscribe(subscription_name, deadline: nil, retain_acked: false, retention: nil, endpoint: nil, push_config: nil, labels: nil, message_ordering: nil, filter: nil, dead_letter_topic: nil, dead_letter_max_delivery_attempts: nil, retry_policy: nil) ⇒ Google::Cloud::PubSub::Subscription Also known as: create_subscription, new_subscription
Creates a new Subscription object on the current Topic.
452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 |
# File 'lib/google/cloud/pubsub/topic.rb', line 452 def subscribe subscription_name, deadline: nil, retain_acked: false, retention: nil, endpoint: nil, push_config: nil, labels: nil, message_ordering: nil, filter: nil, dead_letter_topic: nil, dead_letter_max_delivery_attempts: nil, retry_policy: nil ensure_service! if push_config && endpoint raise ArgumentError, "endpoint and push_config were both provided. Please provide only one." end push_config = Google::Cloud::PubSub::Subscription::PushConfig.new endpoint: endpoint if endpoint = { deadline: deadline, retain_acked: retain_acked, retention: retention, labels: labels, message_ordering: , filter: filter, dead_letter_max_delivery_attempts: dead_letter_max_delivery_attempts } [:dead_letter_topic_name] = dead_letter_topic.name if dead_letter_topic if [:dead_letter_max_delivery_attempts] && ![:dead_letter_topic_name] # Service error message "3:Invalid resource name given (name=)." does not identify param. raise ArgumentError, "dead_letter_topic is required with dead_letter_max_delivery_attempts" end [:push_config] = push_config.to_grpc if push_config [:retry_policy] = retry_policy.to_grpc if retry_policy grpc = service.create_subscription name, subscription_name, Subscription.from_grpc grpc, service end |
#subscription(subscription_name, skip_lookup: nil) ⇒ Google::Cloud::PubSub::Subscription? Also known as: get_subscription, find_subscription
Retrieves subscription by name.
530 531 532 533 534 535 536 537 |
# File 'lib/google/cloud/pubsub/topic.rb', line 530 def subscription subscription_name, skip_lookup: nil ensure_service! return Subscription.from_name subscription_name, service if skip_lookup grpc = service.get_subscription subscription_name Subscription.from_grpc grpc, service rescue Google::Cloud::NotFoundError nil end |
#subscriptions(token: nil, max: nil) ⇒ Array<Subscription> Also known as: find_subscriptions, list_subscriptions
Retrieves a list of subscription names for the given project.
573 574 575 576 577 578 |
# File 'lib/google/cloud/pubsub/topic.rb', line 573 def subscriptions token: nil, max: nil ensure_service! = { token: token, max: max } grpc = service.list_topics_subscriptions name, Subscription::List.from_topic_grpc grpc, service, name, max end |
#test_permissions(*permissions) ⇒ Array<Strings>
Tests the specified permissions against the Cloud IAM access control policy.
913 914 915 916 917 918 919 |
# File 'lib/google/cloud/pubsub/topic.rb', line 913 def * = Array().flatten = Array().flatten ensure_service! grpc = service. name, grpc. end |
#update_policy(new_policy) ⇒ Policy Also known as: policy=
Updates the Cloud IAM access control
policy for this topic. The policy should be read from #policy. See
Policy for an explanation of the policy
etag property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
873 874 875 876 877 |
# File 'lib/google/cloud/pubsub/topic.rb', line 873 def update_policy new_policy ensure_service! grpc = service.set_topic_policy name, new_policy.to_grpc @policy = Policy.from_grpc grpc end |