Module: Cloudenvoy::Backend::GooglePubSub
- Defined in:
- lib/cloudenvoy/backend/google_pub_sub.rb
Overview
Interface to GCP Pub/Sub and Pub/Sub local emulator
Class Method Summary collapse
-
.backend ⇒ Google::Cloud::Pub
Return the backend to use for sending messages.
-
.config ⇒ Cloudenvoy::Config
Return the cloudenvoy configuration.
-
.development? ⇒ Boolean
Return true if the current config mode is development.
-
.publish(topic, payload, metadata = {}) ⇒ Cloudenvoy::Message
Publish a message to a topic.
-
.publish_all(topic, msg_args) ⇒ Array<Cloudenvoy::Message>
Publish multiple messages to a topic.
-
.upsert_subscription(topic, name, opts = {}) ⇒ Cloudenvoy::Subscription
Create or update a subscription for a specific topic.
-
.upsert_topic(topic) ⇒ Cloudenvoy::Topic
Create or update a topic.
-
.webhook_url ⇒ String
Return an authenticated endpoint for processing Pub/Sub webhooks.
Class Method Details
.backend ⇒ Google::Cloud::Pub
Return the backend to use for sending messages.
34 35 36 37 38 39 |
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 34 def backend @backend ||= Google::Cloud::PubSub.new(**{ project_id: config.gcp_project_id, emulator_host: development? ? Cloudenvoy::Config::EMULATOR_HOST : nil }.compact) end |
.config ⇒ Cloudenvoy::Config
Return the cloudenvoy configuration. See Cloudenvoy#configure.
16 17 18 |
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 16 def config Cloudenvoy.config end |
.development? ⇒ Boolean
Return true if the current config mode is development.
25 26 27 |
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 25 def development? config.mode == :development end |
.publish(topic, payload, metadata = {}) ⇒ Cloudenvoy::Message
Publish a message to a topic.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 59 def publish(topic, payload, = {}) # Retrieve the topic ps_topic = backend.topic(topic, skip_lookup: true) # Publish the message ps_msg = ps_topic.publish(payload.to_json, .to_h) # Return formatted message Message.new( id: ps_msg., payload: payload, metadata: , topic: topic ) end |
.publish_all(topic, msg_args) ⇒ Array<Cloudenvoy::Message>
Publish multiple messages to a topic.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 83 def publish_all(topic, msg_args) # Retrieve the topic ps_topic = backend.topic(topic, skip_lookup: true) # Publish the message ps_msgs = ps_topic.publish do |batch| msg_args.each do |(payload, )| batch.publish(payload.to_json, .to_h) end end # Return the formatted messages ps_msgs.each_with_index.map do |ps_msg, index| payload, = msg_args[index] Message.new( id: ps_msg., payload: payload, metadata: , topic: topic ) end end |
.upsert_subscription(topic, name, opts = {}) ⇒ Cloudenvoy::Subscription
Create or update a subscription for a specific topic.
129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 |
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 129 def upsert_subscription(topic, name, opts = {}) sub_config = opts.to_h.merge(endpoint: webhook_url) # Auto-create topic in development. In non-development environments # the create subscription action raises an error if the topic does # not exist upsert_topic(topic) if development? # Create subscription ps_sub = begin # Retrieve the topic ps_topic = backend.topic(topic, skip_lookup: true) # Attempt to create the subscription ps_topic.subscribe(name, **sub_config) rescue Google::Cloud::AlreadyExistsError # Update endpoint on subscription # Topic is not updated as it is name-dependent backend.subscription(name).tap do |e| sub_config.each do |k, v| e.send("#{k}=", v) end end end # Return formatted subscription Subscription.new(name: ps_sub.name, original: ps_sub) end |
.upsert_topic(topic) ⇒ Cloudenvoy::Topic
Create or update a topic.
166 167 168 169 170 171 172 173 174 175 |
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 166 def upsert_topic(topic) ps_topic = begin backend.create_topic(topic) rescue Google::Cloud::AlreadyExistsError backend.topic(topic) end # Return formatted subscription Topic.new(name: ps_topic.name, original: ps_topic) end |
.webhook_url ⇒ String
Return an authenticated endpoint for processing Pub/Sub webhooks.
46 47 48 |
# File 'lib/cloudenvoy/backend/google_pub_sub.rb', line 46 def webhook_url "#{config.processor_url}?token=#{Authenticator.verification_token}" end |