Class: Azure::ServiceBus::ServiceBusService

Inherits:
Core::SignedService show all
Defined in:
lib/azure/service_bus/service_bus_service.rb

Constant Summary collapse

DEFAULT_TIMEOUT =
60

Instance Attribute Summary

Attributes inherited from Core::SignedService

#account_name, #signer

Attributes inherited from Core::FilteredService

#filters

Attributes inherited from Core::Service

#host

Instance Method Summary collapse

Methods inherited from Core::SignedService

#call

Methods inherited from Core::FilteredService

#call, #with_filter

Methods inherited from Core::Service

#call, #generate_uri

Constructor Details

#initialize(host = Azure.config.service_bus_host) ⇒ ServiceBusService

Returns a new instance of ServiceBusService.



28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/azure/service_bus/service_bus_service.rb', line 28

def initialize(host=Azure.config.service_bus_host)
  super(Azure::ServiceBus::Auth::WrapSigner.new)
    @host = host
    
    with_filter do |req, res| 
      req.headers.delete "x-ms-date"
      req.headers.delete "x-ms-version"
      req.headers.delete "DataServiceVersion"
      req.headers.delete "MaxDataServiceVersion"
      req.headers["X-Process-At"] = "servicebus"
      res.call
    end
end

Instance Method Details

#create_queue(queue, options = {}) ⇒ Object

Creates a new queue. Once created, this queue’s resource manifest is immutable.

Attributes

  • queue - Azure::ServiceBus::Queue instance to create on server, or a string of the queue name

  • options - Hash. The queue properties.

Options

Accepted key/value pairs in options parameter are:

  • :default_message_time_to_live - XML datetime. Determines how long a message lives in the associated subscriptions.

  • :duplicate_detection_history_time_window - XML datetime. Specifies the time span during which the Service Bus will detect message duplication.

  • :enable_batched_operations - Boolean. Enables or disables service side batching behavior when performing operations for the specific queue.

  • :dead_lettering_on_message_expiration: - Boolean. This field controls how the Service Bus handles a message whose TTL has expired.

  • :lock_duration - XML datetime. Determines the amount of time in seconds in which a message should be locked for processing by a receiver.

  • :max_delivery_count - Number. A message is automatically deadlettered after this number of deliveries.

  • :max_size_in_megabytes - Number. Specifies the maximum topic size in megabytes

  • :message_count - Number. Displays the number of messages currently in the queue.

  • :requires_duplicate_detection - Boolean. If enabled, the topic will detect duplicate messages within the time span specified by the DuplicateDetectionHistoryTimeWindow property

  • :requires_session - Boolean. If set to true, the queue will be session-aware and only SessionReceiver will be supported.

  • :size_in_bytes - Number. Reflects the actual bytes toward the topic quota that messages in the topic currently occupy.



120
121
122
123
# File 'lib/azure/service_bus/service_bus_service.rb', line 120

def create_queue(queue, options={})
  queue = _new_or_existing(Azure::ServiceBus::Queue, queue, options ? options : {})
  create_resource_entry(:queue, queue, queue.name)
end

#create_relay(relay, options = {}) ⇒ Object

Creates a new relay endpoint. Once created, this relay endpoint resource manifest is immutable.

Attributes

  • relay - Azure::ServiceBus::Relay instance to create on server, or a string of the relay endpoint name

  • options - Hash. The relay endpoint properties.

Options

Accepted key/value pairs in options parameter are:

  • :relay_type - String. Determines the type of the relay endpoint.

  • :requires_client_authorization - Boolean. Determines whether or not clients need to authenticate when making calls.

  • :requires_transport_security - Boolean. Determines whether or not the endpoint uses transport security.



56
57
58
59
# File 'lib/azure/service_bus/service_bus_service.rb', line 56

def create_relay(relay, options={})
  relay = _new_or_existing(Azure::ServiceBus::Relay, relay, options ? options : {})
  create_resource_entry(:relay, relay, relay.name)
end

#create_rule(*p) ⇒ Object

Creates a new rule. Once created, this rule’s resource manifest is immutable.

Attributes

Pass either (topic_name, subscription_name, rule_name) as strings, or (rule) a rule object. When using (topic_name, subscription_name, rule_name, options) overload, you may also pass the properties for the rule.

Options

Accepted key/value pairs in options parameter are:

  • :filter - String. The rule filter.

  • :action - String. The rule action.



237
238
239
240
241
242
243
# File 'lib/azure/service_bus/service_bus_service.rb', line 237

def create_rule(*p)
  rule = _rule_from(*p)
  result = create_resource_entry(:rule, rule, rule.topic, rule.subscription, rule.name)
  result.topic = rule.topic
  result.subscription = rule.subscription
  result
end

#create_subscription(*p) ⇒ Object

Creates a new subscription. Once created, this subscription resource manifest is immutable.

Attributes

Pass either (topic_name, subscription_name) as strings, or (subscription) a object. When using (topic_name, subscription_name) overload, you may also pass optional properties for the subscription.

Options

Accepted key/value pairs in options parameter are:

  • :lock_duration - XML datetime. Determines the amount of time in seconds in which a message should be locked for processing by a receiver.

  • :requires_session - Boolean. If set to true, the queue will be session-aware and only SessionReceiver will be supported.

  • :default_message_time_to_live - XML datetime. Determines how long a message lives in the associated subscriptions.

  • :dead_lettering_on_message_expiration: - Boolean. This field controls how the Service Bus handles a message whose TTL has expired.

  • :dead_lettering_on_filter_evaluation_exceptions - Boolean. Determines how the Service Bus handles a message that causes an exception during a subscription’s filter evaluation.

  • :enable_batched_operations - Boolean. Enables or disables service side batching behavior when performing operations for the specific queue.

  • :max_delivery_count - Number. A message is automatically deadlettered after this number of deliveries.

  • :message_count - Number. Displays the number of messages currently in the queue.



323
324
325
326
327
328
329
# File 'lib/azure/service_bus/service_bus_service.rb', line 323

def create_subscription(*p)
  subscription = _subscription_from(*p)

  result = create_resource_entry(:subscription, subscription, subscription.topic, subscription.name)
  result.topic = subscription.topic
  result
end

#create_topic(topic, options = {}) ⇒ Object

Creates a new topic. Once created, this topic resource manifest is immutable.

Attributes

  • topic - Azure::ServiceBus::Topic instance to create on server, or a string of the topic name

  • options - Hash. The topic properties.

Options

Accepted key/value pairs in options parameter are:

  • :default_message_time_to_tive - XML datetime. Determines how long a message lives in the associated subscriptions.

  • :maximum_number_of_subscriptions - Number. Specifies the maximum number of subscriptions that can be associated with the topic.

  • :max_size_in_megabytes - Number. Specifies the maximum topic size in megabytes

  • :requires_duplicate_detection - Boolean. If enabled, the topic will detect duplicate messages within the time span specified by the DuplicateDetectionHistoryTimeWindow property

  • :dead_lettering_on_filter_evaluation_exceptions - Boolean. Determines how the Service Bus handles a message that causes an exception during a subscription’s filter evaluation.

  • :duplicate_detection_history_time_window - XML datetime. Specifies the time span during which the Service Bus will detect message duplication.

  • :enable_batched_operations - Boolean. Enables or disables service side batching behavior when performing operations for the specific queue.



181
182
183
184
# File 'lib/azure/service_bus/service_bus_service.rb', line 181

def create_topic(topic, options={})
  topic = _new_or_existing(Azure::ServiceBus::Topic, topic, options ? options : {})
  create_resource_entry(:topic, topic, topic.name)
end

#delete_queue(queue) ⇒ Object

Deletes an existing queue. This operation will also remove all associated state including messages in the queue.

Attributes

  • queue - Azure::ServiceBus::Queue instance to delete or a string of the queue name



131
132
133
# File 'lib/azure/service_bus/service_bus_service.rb', line 131

def delete_queue(queue)
  delete_resource_entry(:queue, _name_for(queue))
end

#delete_queue_message(message) ⇒ Object

Completes processing on a locked message and delete it from the queue. This operation should only be called after processing a previously locked message is successful to maintain At-Least-Once delivery assurances.

Attributes

  • message - String. Either the message location URL or a message object.



552
553
554
# File 'lib/azure/service_bus/service_bus_service.rb', line 552

def delete_queue_message(message)
  _delete_message(message)
end

#delete_relay(relay) ⇒ Object

Deletes an existing relay endpoint.

Attributes

  • relay - Azure::ServiceBus::Relay instance to delete or a string of the relay endpoint name



66
67
68
# File 'lib/azure/service_bus/service_bus_service.rb', line 66

def delete_relay(relay)
  delete_resource_entry(:relay, _name_for(relay))
end

#delete_rule(*p) ⇒ Object

Deletes an existing rule.

Attributes

Pass either (topic_name, subscription_name, rule_name) as strings, or (rule) a object with .name, .topic, and .subscription methods such as Azure::ServiceBus::Rule instance.

Note: The default rule name is ‘$Default’. Use this name to delete the default rule for the subscription.



253
254
255
256
257
# File 'lib/azure/service_bus/service_bus_service.rb', line 253

def delete_rule(*p)
  topic_name, subscription_name, rule_name = _rule_args(*p)

  delete_resource_entry(:rule, topic_name, subscription_name, rule_name)
end

#delete_subscription(*p) ⇒ Object

Deletes an existing subscription.

Attributes

Pass either (topic_name, subscription_name) as strings, or (subscription) a object with .name and .topic methods such as Azure::ServiceBus::Subscription instance.



338
339
340
341
342
# File 'lib/azure/service_bus/service_bus_service.rb', line 338

def delete_subscription(*p)
  topic_name, subscription_name = _subscription_args(*p)

  delete_resource_entry(:subscription, topic_name, subscription_name)
end

#delete_subscription_message(message) ⇒ Object

Completes processing on a locked message and delete it from the subscription. This operation should only be called after processing a previously locked message is successful to maintain At-Least-Once delivery assurances.

Attributes

  • message - String. Either the message location URL or a message object.



469
470
471
# File 'lib/azure/service_bus/service_bus_service.rb', line 469

def delete_subscription_message(message)
  _delete_message(message)
end

#delete_topic(topic) ⇒ Object

Deletes an existing topic. This operation will also remove all associated state including associated subscriptions.

Attributes

  • topic - Azure::ServiceBus::Topic instance to delete or a string of the topic name



192
193
194
# File 'lib/azure/service_bus/service_bus_service.rb', line 192

def delete_topic(topic)
  delete_resource_entry(:topic, _name_for(topic))
end

#get_queue(queue) ⇒ Object

Retrieves an existing queue.

Attributes

  • queue - Azure::ServiceBus::Queue instance to retrieve or a string of the queue name



140
141
142
# File 'lib/azure/service_bus/service_bus_service.rb', line 140

def get_queue(queue)
  resource_entry(:queue, _name_for(queue))
end

#get_relay(relay) ⇒ Object

Retrieves the description for the specified relay endpoint.

Attributes

  • relay - Azure::ServiceBus::Relay instance to retrieve or a string of the relay endpoint name



75
76
77
# File 'lib/azure/service_bus/service_bus_service.rb', line 75

def get_relay(relay)
  resource_entry(:relay, _name_for(relay))
end

#get_rule(*p) ⇒ Object

Retrieves the description for the specified rule.

Attributes

Pass either (topic_name, subscription_name, rule_name) as strings, or (rule) a object with .name, .topic, and .subscription methods such as Azure::ServiceBus::Rule instance.

Note: The default rule name is ‘$Default’. Use this name to retrieve the default rule for the subscription.



267
268
269
270
271
272
273
274
# File 'lib/azure/service_bus/service_bus_service.rb', line 267

def get_rule(*p)
  topic_name, subscription_name, rule_name = _rule_args(*p)

  result = resource_entry(:rule, topic_name, subscription_name, rule_name)
  result.topic = topic_name
  result.subscription = subscription_name
  result
end

#get_subscription(*p) ⇒ Object

Gets an existing subscription.

Attributes

Pass either (topic_name, subscription_name) as strings, or (subscription) a object with .name and .topic methods such as Azure::ServiceBus::Subscription instance.



350
351
352
353
354
355
356
# File 'lib/azure/service_bus/service_bus_service.rb', line 350

def get_subscription(*p)
  topic_name, subscription_name = _subscription_args(*p)
  
  result = resource_entry(:subscription, topic_name, subscription_name)
  result.topic = topic_name
  result
end

#get_topic(topic) ⇒ Object

Retrieves the description for the specified topic.

Attributes

  • topic - Azure::ServiceBus::Topic instance to retrieve or a string of the topic name



201
202
203
# File 'lib/azure/service_bus/service_bus_service.rb', line 201

def get_topic(topic)
  resource_entry(:topic, _name_for(topic))
end

#list_queues(options = {}) ⇒ Object

Enumerates the queues in the service namespace.

Attributes

  • options - Hash. Optional parameters.

Options

Accepted key/value pairs in options parameter are:

  • :skip - Integer. Number of queues to skip.

  • :top - Integer. Number of queues to list.



155
156
157
158
159
160
161
# File 'lib/azure/service_bus/service_bus_service.rb', line 155

def list_queues(options={})
  query = {}
  query["$skip"] = options[:skip].to_i.to_s if options[:skip]
  query["$top"] = options[:top].to_i.to_s if options[:top]

  resource_list(:queue, query)
end

#list_relays(options = {}) ⇒ Object

Enumerates the relay endpoints in the service namespace.

Attributes

  • options - Hash. Optional parameters.

Options

Accepted key/value pairs in options parameter are:

  • :skip - Integer. Number of queues to skip.

  • :top - Integer. Number of queues to list.



90
91
92
93
94
95
96
# File 'lib/azure/service_bus/service_bus_service.rb', line 90

def list_relays(options={})
  query = {}
  query["$skip"] = options[:skip].to_i.to_s if options[:skip]
  query["$top"] = options[:top].to_i.to_s if options[:top]

  resource_list(:relay, query)
end

#list_rules(*p) ⇒ Object

Retrieves the rules that exist under the specified subscription.

Attributes

Pass either (topic_name, subscription_name) as strings, or (subscription) a object with .name and .topic methods such as Azure::ServiceBus::Subscription instance.

  • options - Hash. Optional parameters.

Options

Accepted key/value pairs in options parameter are:

  • :skip - Integer. Number of topics to skip.

  • :top - Integer. Number of topics to list.



290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/azure/service_bus/service_bus_service.rb', line 290

def list_rules(*p)
  topic_name, subscription_name, options = _subscription_args(*p)

  query = {}
  query["$skip"] = options[:skip].to_i.to_s if options[:skip]
  query["$top"] = options[:top].to_i.to_s if options[:top]

  results = resource_list(:rule, topic_name, subscription_name, query)
  results.each{|r| r.topic = topic_name; r.subscription=subscription_name}

  return results
end

#list_subscriptions(topic, options = {}) ⇒ Object

Retrieves the subscriptions in the specified topic.

Attributes

  • topic - Either a Azure::ServiceBus::Topic instance or a string of the topic name

  • options - Hash. Optional parameters.

Options

Accepted key/value pairs in options parameter are:

  • :skip - Integer. Number of subscriptions to skip.

  • :top - Integer. Number of subscriptions to list.



370
371
372
373
374
375
376
377
378
379
380
# File 'lib/azure/service_bus/service_bus_service.rb', line 370

def list_subscriptions(topic, options={})
  topic = _name_for(topic)
  query = {}
  query["$skip"] = options[:skip].to_i.to_s if options[:skip]
  query["$top"] = options[:top].to_i.to_s if options[:top]

  results = resource_list(:subscription, topic, query)
  results.each { |s| s.topic = topic }

  return results
end

#list_topics(options = {}) ⇒ Object

Retrieves the topics in the service namespace.

Attributes

  • options - Hash. Optional parameters.

Options

Accepted key/value pairs in options parameter are:

  • :skip - Integer. Number of topics to skip.

  • :top - Integer. Number of topics to list.



216
217
218
219
220
221
222
# File 'lib/azure/service_bus/service_bus_service.rb', line 216

def list_topics(options={})
  query = {}
  query["$skip"] = options[:skip].to_i.to_s if options[:skip]
  query["$top"] = options[:top].to_i.to_s if options[:top]

  resource_list(:topic, query)
end

#peek_lock_queue_message(queue, options = {}) ⇒ Object

Automatically retrieves and locks a message from a queue for processing. The message is guaranteed not to be delivered to other receivers (on the same subscription only) during the lock duration period specified in the queue description. Once the lock expires, the message will be available to other receivers. In order to complete processing of the message, the receiver should issue a delete command with the lock ID received from this operation. To abandon processing of the message and unlock it for other receivers, an Unlock Message command should be issued, or the lock duration period can expire.

Attributes

  • queue - String. Either a Azure::ServiceBus::Queue instance or a string of the queue name

  • options - Hash. Optional parameters.

Options

Accepted key/value pairs in options parameter are:

  • :timeout - Integer. Timeout for the REST call.



508
509
510
# File 'lib/azure/service_bus/service_bus_service.rb', line 508

def peek_lock_queue_message(queue, options={})
  _peek_lock_message(_name_for(queue), options[:timeout] ? options[:timeout] : DEFAULT_TIMEOUT)
end

#peek_lock_subscription_message(topic, subscription, options = {}) ⇒ Object

This operation is used to atomically retrieve and lock a message for processing. The message is guaranteed not to be delivered to other receivers during the lock duration period specified in buffer description. Once the lock expires, the message will be available to other receivers (on the same subscription only) during the lock duration period specified in the topic description. Once the lock expires, the message will be available to other receivers. In order to complete processing of the message, the receiver should issue a delete command with the lock ID received from this operation. To abandon processing of the message and unlock it for other receivers, an Unlock Message command should be issued, or the lock duration period can expire.

Attributes

  • topic - String. The name of the topic or a Topic instance

  • subscription - String. The name of the subscription or a Subscription instance

  • options - Hash. Optional parameters.

Options

Accepted key/value pairs in options parameter are:

  • :timeout - Integer. Timeout for the REST call.



417
418
419
420
421
422
# File 'lib/azure/service_bus/service_bus_service.rb', line 417

def peek_lock_subscription_message(topic, subscription, options={})
  topic = _name_for(topic)
  subscription = _name_for(subscription)

  _peek_lock_message(subscriptions_path(topic, subscription), options[:timeout] ? options[:timeout] : DEFAULT_TIMEOUT)
end

#read_delete_queue_message(queue, options = {}) ⇒ Object

Reads and deletes a message from a queue as an atomic operation. This operation should be used when a best-effort guarantee is sufficient for an application; that is, using this operation it is possible for messages to be lost if processing fails.

Attributes

  • queue - Either a Azure::ServiceBus::Queue instance or a string of the queue name

  • options - Hash. Optional parameters.

Options

Accepted key/value pairs in options parameter are:

  • :timeout - Integer. Timeout for the REST call.



540
541
542
# File 'lib/azure/service_bus/service_bus_service.rb', line 540

def read_delete_queue_message(queue, options={})
  _read_delete_message(_name_for(queue), options[:timeout] ? options[:timeout] : DEFAULT_TIMEOUT)
end

#read_delete_subscription_message(topic, subscription, options = {}) ⇒ Object

Read and delete a message from a subscription as an atomic operation. This operation should be used when a best-effort guarantee is sufficient for an application; that is, using this operation it is possible for messages to be lost if processing fails.

Attributes

  • topic - The name of the topic or a Topic instance

  • subscription - The name of the subscription or a Subscription instance

  • options - Hash. Optional parameters.

Options

Accepted key/value pairs in options parameter are:

  • :timeout - Integer. Timeout for the REST call.



454
455
456
457
458
459
# File 'lib/azure/service_bus/service_bus_service.rb', line 454

def read_delete_subscription_message(topic, subscription, options={})
  topic = _name_for(topic)
  subscription = _name_for(subscription)

  _read_delete_message(subscriptions_path(topic, subscription), options[:timeout] ? options[:timeout] : DEFAULT_TIMEOUT)
end

#receive_queue_message(queue, options = {}) ⇒ Object

Public: Receives a queue message.

Attributes

  • queue - String. The queue name.

  • options - Hash. Optional parameters.

Options

Accepted key/value pairs in options parameter are:

  • :peek_lock - Boolean. Lock when peeking.

  • :timeout - Integer. Timeout for the REST call.



569
570
571
572
573
574
575
576
577
578
# File 'lib/azure/service_bus/service_bus_service.rb', line 569

def receive_queue_message(queue, options={})
  peek_lock = options.fetch(:peek_lock, true)

  options[:timeout] = options[:timeout] ? options[:timeout] : DEFAULT_TIMEOUT
  if peek_lock
    peek_lock_queue_message(queue, options)
  else
    read_delete_queue_message(queue, options)
  end
end

#receive_subscription_message(topic, subscription, options = {}) ⇒ Object

Public: Receives a subscription message.

Attributes

  • topic - String. The topic name.

  • options - Hash. Optional parameters.

Options

Accepted key/value pairs in options parameter are:

  • :peek_lock - Boolean. Lock when peeking.

  • :timeout - Integer. Timeout for the REST call.



593
594
595
596
597
598
599
600
601
602
# File 'lib/azure/service_bus/service_bus_service.rb', line 593

def receive_subscription_message(topic, subscription, options={})
  peek_lock = options.fetch(:peek_lock, true)

  options[:timeout] = options[:timeout] ? options[:timeout] : DEFAULT_TIMEOUT
  if peek_lock
    peek_lock_subscription_message(topic, subscription, options)
  else
    read_delete_subscription_message(topic, subscription, options)
  end
end

#send_queue_message(queue, message) ⇒ Object

Sends a message into the specified queue. The limit to the number of messages which may be present in the topic is governed by the message size the MaxTopicSizeInMegaBytes. If this message will cause the queue to exceed its quota, a quota exceeded error is returned and the message will be rejected.

Attributes

  • queue - Either a Azure::ServiceBus::Queue instance or a string of the queue name

  • message - An Azure::ServiceBus::BrokeredMessage object containing message body and properties, or a string of the message body (a default BrokeredMessage will be created from the string).



483
484
485
# File 'lib/azure/service_bus/service_bus_service.rb', line 483

def send_queue_message(queue, message)
  _send_message(_name_for(queue), message)
end

#send_topic_message(topic, message) ⇒ Object

Enqueues a message into the specified topic. The limit to the number of messages which may be present in the topic is governed by the message size in MaxTopicSizeInBytes. If this message causes the topic to exceed its quota, a quota exceeded error is returned and the message will be rejected.

Attributes

  • topic - Either a Azure::ServiceBus::Topic instance or a string of the topic name

  • message - An Azure::ServiceBus::BrokeredMessage object containing message body and properties, or a string of the message body (a default BrokeredMessage will be created from the string).



392
393
394
# File 'lib/azure/service_bus/service_bus_service.rb', line 392

def send_topic_message(topic, message)
  _send_message(_name_for(topic), message)
end

#unlock_queue_message(message) ⇒ Object

Unlocks a message for processing by other receivers on a given subscription. This operation deletes the lock object, causing the message to be unlocked. A message must have first been locked by a receiver before this operation is called.

Attributes

  • message - String. Either the message location URL or a message object.



521
522
523
# File 'lib/azure/service_bus/service_bus_service.rb', line 521

def unlock_queue_message(message)
  _unlock_message(message)
end

#unlock_subscription_message(message) ⇒ Object

Unlock a message for processing by other receivers on a given subscription. This operation deletes the lock object, causing the message to be unlocked. A message must have first been locked by a receiver before this operation is called.

Attributes

  • message - String. Either the message location URL or a message object.



434
435
436
# File 'lib/azure/service_bus/service_bus_service.rb', line 434

def unlock_subscription_message(message)
  _unlock_message(message)
end