Class: MessageBuffer

Inherits:
Object
  • Object
show all
Includes:
DotNetServices::HTTPRequests, HTTPProxy
Defined in:
lib/service_bus/message_buffer.rb

Overview

This class contains implementation to

  • Create a message buffer

  • Retrieve message buffer policy for a given message buffer name

  • Delete a message buffer

  • Send message to a message buffer

  • Make a peek lock on a message buffer

  • Delete a locked message in the given message buffer

  • Release lock on a locked message buffer

  • Retrieve a message from the given message buffer

The class includes DotNetServices::HTTPRequests module that holds implementations to make HTTP requests

Constant Summary collapse

HTTP_STATUS_CODE_OK =
'200'
HTTP_STATUS_CODE_CREATED =
'201'
HTTP_STATUS_CODE_ACCEPTED =
'202'
HTTP_STATUS_CODE_NORESPONSE =
'204'
HTTP_STATUS_CODE_BADREQUEST =
'400'
HTTP_STATUS_CODE_INTERNAL_SERVER_ERROR =
'500'
HTTP_HEADER_EXPECT =
'100-continue'
ACCEPTED_RESPONSE_CODES =
[HTTP_STATUS_CODE_ACCEPTED, HTTP_STATUS_CODE_CREATED, HTTP_STATUS_CODE_OK]
DEFAULT_LOCK_DURATION =
"120"
DEFAULT_RECEIVE_TIMEOUT =
"60"
DEFAULT_SEND_TIMEOUT =
"60"

Instance Method Summary collapse

Methods included from DotNetServices::HTTPRequests

#delete, #get, #post, #proxy, #put

Methods included from HTTPProxy

#set_http_web_proxy

Constructor Details

#initialize(issuer_name, issuer_key, solution_name, service_name, proxy = {}) ⇒ MessageBuffer

Initializes the Mesasge Buffer object with following settings

  • issuer_name

  • issuer_key (issuer_secret or Management key)

  • solution_name

  • service_name

  • proxy - OPTIONAL hash with keys

    • ‘http_web_proxy_server’

    • ‘http_web_proxy_port’

    • ‘http_web_proxy_username’

    • ‘http_web_proxy_password’



69
70
71
72
73
74
75
76
77
78
# File 'lib/service_bus/message_buffer.rb', line 69

def initialize(issuer_name, issuer_key, solution_name, service_name, proxy = {})
  @issuer_name = issuer_name
  @issuer_key = issuer_key
  @solution_name = solution_name
  @service_name = service_name
  @proxy = proxy
  set_http_web_proxy(proxy)
  @acm_host_name = DotNetServicesEnvironment.acm_host_name
  @service_bus_host_name = DotNetServicesEnvironment.service_bus_host_name
end

Instance Method Details

#create_message_buffer(msg_buffer_name, msg_buffer_policy_obj) {|token| ... } ⇒ Object

Creates a message buffer. Accepts following params

  • msg_buffer_name

  • msg_buffer_policy_obj (MessageBufferPolicy)

Yields:

  • (token)


83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/service_bus/message_buffer.rb', line 83

def create_message_buffer(msg_buffer_name, msg_buffer_policy_obj)
  raise 'Message Buffer Policy cannot be nil' unless msg_buffer_policy_obj
  msg_buffer_policy = msg_buffer_policy_obj.message_buffer_policy
  request_uri, msg_buffer_uri = msg_buffer_uris(msg_buffer_name)
  raise 'Message buffer URI not found' unless msg_buffer_uri
  token = acquire_authorization_token(request_uri, msg_buffer_uri)
  yield(token) if block_given?
  begin
    options = {}
    header = {
      'Content-type' => MessageBufferConstants.content_type_property_for_atom_xml,
      'Authorization' => authorization(token),
      'Expect' => HTTP_HEADER_EXPECT
    }
    
    options.store(:header, header)
    response = put(msg_buffer_uri, msg_buffer_policy, options)
    if ACCEPTED_RESPONSE_CODES.include?(response.code)
      return response.body
    else
      raise("Unexpected response code #{response.code}")
    end
  rescue
    raise "Unexpected exception occured in create_message_buffer"
  end
end

#delete_locked_message(message_uri, lock_id) {|token| ... } ⇒ Object

Deletes a locked message. Expects following arguments obtained as part of the response of peek_lock-

  • Message Buffer Uri

  • Lock Id

Yields:

  • (token)


214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# File 'lib/service_bus/message_buffer.rb', line 214

def delete_locked_message(message_uri, lock_id)
  lock_id_parameter = MessageBufferConstants.lock_id_parameter
  token = acquire_authorization_token(request_uri, message_uri)
  yield(token) if block_given?
  delete_uri = message_uri + "?" + lock_id_parameter + "=" + lock_id
  options = {}
  header = { 
    'Content-type' => MessageBufferConstants.content_type_property_for_atom_xml,
    'Expect' => HTTP_HEADER_EXPECT,
    'Authorization' => "#{MessageBufferConstants.shared_secret_header} #{CGI::unescape(token)}",
    'Accept' => "*/*"
   }
  options.store(:header,header)
  response = delete(delete_uri, options)
  if response.code == HTTP_STATUS_CODE_OK
    return response.body
  else
    raise "Unexpected response code #{response.code}"
  end
end

#delete_message_buffer(msg_buffer_name) {|token| ... } ⇒ Object

Deletes message buffer. Expects -

  • Message Buffer Name

Yields:

  • (token)


137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/service_bus/message_buffer.rb', line 137

def delete_message_buffer(msg_buffer_name)
  request_uri, msg_buffer_uri = msg_buffer_uris(msg_buffer_name)
  token = acquire_authorization_token(request_uri, msg_buffer_uri)
  yield(token) if block_given?
  options = {}
  header = {
    'Content-type' => MessageBufferConstants.content_type_property_for_atom_xml,
    'Expect' => HTTP_HEADER_EXPECT,
    'Authorization' => authorization(token) 
  }
  options.store(:header, header)
  response = delete(msg_buffer_uri, options)
  if response.code == HTTP_STATUS_CODE_OK
    return response.body
  else
    raise "Unexpected response code #{response.code}"
  end
end

#peek_lock(msg_buffer_name, timeout = DEFAULT_RECEIVE_TIMEOUT, lock_duration = DEFAULT_LOCK_DURATION) {|token| ... } ⇒ Object

Makes a peek lock on given message buffer. Expects -

  • Message Buffer Name

  • Timeout(Default = DEFAULT_SEND_TIMEOUT)

  • Lock Duration(Default = DEFAULT_LOCK_DURATION)

Yields:

  • (token)


183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/service_bus/message_buffer.rb', line 183

def peek_lock(msg_buffer_name, timeout = DEFAULT_RECEIVE_TIMEOUT, lock_duration = DEFAULT_LOCK_DURATION)
  retrieve_path = MessageBufferConstants.path_for_retrieve_message
  timeout_parameter = MessageBufferConstants.timeout_parameter
  lock_duration_parameter = MessageBufferConstants.lock_duration_parameter
  msg_buffer_uri = message_uri(msg_buffer_name, retrieve_path)
  token = acquire_authorization_token(request_uri, msg_buffer_uri)
  yield(token) if block_given?
  retrieve_url = msg_buffer_uri + "?" + timeout_parameter + "=" + timeout + "&" + lock_duration_parameter + "=" + lock_duration
  options = {}
  header = { 
      'Content-type' => MessageBufferConstants.content_type_property_for_text,
      'Expect' => HTTP_HEADER_EXPECT,
      'Authorization' => authorization(token) ,
      'Accept' => "*/*" 
  }
  options.store(:header,header)
  response = post(retrieve_url, "", options)
   if ACCEPTED_RESPONSE_CODES.include?(response.code)
    return LockedMessageInfo.new(
      response['X-MS-LOCK-ID'],
      response['X-MS-LOCK-LOCATION'],
      response['X-MS-MESSAGE-LOCATION']         
    )
  else
    raise "Unexpected response code #{response.code}"
  end
end

#policy(msg_buffer_name) {|token| ... } ⇒ Object

Fetches policy of the given message buffer. Expects -

  • Message Buffer Name

Yields:

  • (token)


112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/service_bus/message_buffer.rb', line 112

def policy(msg_buffer_name)
  request_uri, msg_buffer_uri = msg_buffer_uris(msg_buffer_name)
  token = acquire_authorization_token(request_uri, msg_buffer_uri)
  yield(token) if block_given?
  options = {}
  header = {
    'Content-type' => MessageBufferConstants.content_type_property_for_atom_xml,
    'Expect' => HTTP_HEADER_EXPECT,
    'Authorization' => authorization(token) 
  }
  
  options.store(:header, header)
  response = get(msg_buffer_uri, options)
  if response.code == HTTP_STATUS_CODE_OK
    body = response.body
    if body.match(/<entry xmlns=/)
      return body
    else
      raise 'Unexpected response body while trying to get the message buffer policy'
    end
  end
end

#release_lock(lock_uri) {|token| ... } ⇒ Object

Releases lock. Expects following arguments obtained as part of the response of peek_lock-

  • Lock Uri

Yields:

  • (token)


237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# File 'lib/service_bus/message_buffer.rb', line 237

def release_lock(lock_uri)
  token = acquire_authorization_token(request_uri, lock_uri)
  yield(token) if block_given?
  options = {}
  header = { 
    'Content-type' => MessageBufferConstants.content_type_property_for_text,
    'Expect' => HTTP_HEADER_EXPECT,
    'Authorization' => authorization(token) ,
    'Accept' => "*/*"
   }
  options.store(:header,header)
  response = delete(lock_uri,options)
  if response.code == HTTP_STATUS_CODE_OK
    return response.body
  else
    raise "Unexpected response code #{response.code}"
  end
end

#retrieve_message(msg_buffer_name, timeout = DEFAULT_RECEIVE_TIMEOUT) {|token| ... } ⇒ Object

Retrieve the message in the message buffer. Expects -

  • Message Buffer Name

  • Timeout(Default = DEFAULT_SEND_TIMEOUT)

Yields:

  • (token)


259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/service_bus/message_buffer.rb', line 259

def retrieve_message(msg_buffer_name, timeout = DEFAULT_RECEIVE_TIMEOUT)
  retrieve_path = MessageBufferConstants.path_for_retrieve_message
  timeout_parameter = MessageBufferConstants.timeout_parameter
  msg_buffer_uri = message_uri(msg_buffer_name, retrieve_path)
  token = acquire_authorization_token(request_uri, msg_buffer_uri)
  yield(token) if block_given?
  retrieve_url = msg_buffer_uri.to_s + "?" + timeout_parameter.to_s + "=" + timeout.to_s
  options = {}
  header = { 
    'Content-type' => MessageBufferConstants.content_type_property_for_text,
    'Authorization' => "#{MessageBufferConstants.shared_secret_header} #{CGI.unescape(token)}"
   }
  options.store(:header,header)
  response = delete(retrieve_url,options)
  if response.code == HTTP_STATUS_CODE_OK
    return response.body
  else
    raise "Unexpected response code #{response.code}"
  end
end

#send_message(msg_buffer_name, msg_str, timeout = DEFAULT_SEND_TIMEOUT) {|token| ... } ⇒ Object

Sends given message to given message buffer. Expects -

  • Message Buffer Name

  • Message body

  • timeout(Default = DEFAULT_SEND_TIMEOUT)

Yields:

  • (token)


160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
# File 'lib/service_bus/message_buffer.rb', line 160

def send_message(msg_buffer_name, msg_str, timeout = DEFAULT_SEND_TIMEOUT)
  send_path = MessageBufferConstants.path_for_send_message
  timeout_parameter = MessageBufferConstants.timeout_parameter
  msg_buffer_uri = message_uri(msg_buffer_name, send_path)
  message = formatted_message(msg_str)
  token = acquire_authorization_token(request_uri, msg_buffer_uri)
  yield(token) if block_given?
  send_uri = msg_buffer_uri + "?" + timeout_parameter + "=" + timeout
  options = {}
  header = {
    'Content-type' => MessageBufferConstants.content_type_property_for_text,
    'Expect' => HTTP_HEADER_EXPECT,
    'Authorization' => authorization(token) 
  }
  options.store(:header, header)
  response = post(send_uri, message, options)
  ACCEPTED_RESPONSE_CODES.include?(response.code) || raise("Unexpected response code #{response.code}")
end