Class: MessageBuffer
- Inherits:
-
Object
- Object
- MessageBuffer
- Includes:
- DotNetServices::HTTPRequests, HTTPProxy
- Defined in:
- lib/service_bus/message_buffer.rb
Overview
This class contains implementation to
-
Create a message buffer
-
Retrieve message buffer policy for a given message buffer name
-
Delete a message buffer
-
Send message to a message buffer
-
Make a peek lock on a message buffer
-
Delete a locked message in the given message buffer
-
Release lock on a locked message buffer
-
Retrieve a message from the given message buffer
The class includes DotNetServices::HTTPRequests module that holds implementations to make HTTP requests
Constant Summary collapse
- HTTP_STATUS_CODE_OK =
'200'
- HTTP_STATUS_CODE_CREATED =
'201'
- HTTP_STATUS_CODE_ACCEPTED =
'202'
- HTTP_STATUS_CODE_NORESPONSE =
'204'
- HTTP_STATUS_CODE_BADREQUEST =
'400'
- HTTP_STATUS_CODE_INTERNAL_SERVER_ERROR =
'500'
- HTTP_HEADER_EXPECT =
'100-continue'
- ACCEPTED_RESPONSE_CODES =
[HTTP_STATUS_CODE_ACCEPTED, HTTP_STATUS_CODE_CREATED, HTTP_STATUS_CODE_OK]
- DEFAULT_LOCK_DURATION =
"120"
- DEFAULT_RECEIVE_TIMEOUT =
"60"
- DEFAULT_SEND_TIMEOUT =
"60"
Instance Method Summary collapse
-
#create_message_buffer(msg_buffer_name, msg_buffer_policy_obj) {|token| ... } ⇒ Object
Creates a message buffer.
-
#delete_locked_message(message_uri, lock_id) {|token| ... } ⇒ Object
Deletes a locked message.
-
#delete_message_buffer(msg_buffer_name) {|token| ... } ⇒ Object
Deletes message buffer.
-
#initialize(issuer_name, issuer_key, solution_name, service_name, proxy = {}) ⇒ MessageBuffer
constructor
Initializes the Mesasge Buffer object with following settings * issuer_name * issuer_key (issuer_secret or Management key) * solution_name * service_name * proxy - OPTIONAL hash with keys * ‘http_web_proxy_server’ * ‘http_web_proxy_port’ * ‘http_web_proxy_username’ * ‘http_web_proxy_password’.
-
#peek_lock(msg_buffer_name, timeout = DEFAULT_RECEIVE_TIMEOUT, lock_duration = DEFAULT_LOCK_DURATION) {|token| ... } ⇒ Object
Makes a peek lock on given message buffer.
-
#policy(msg_buffer_name) {|token| ... } ⇒ Object
Fetches policy of the given message buffer.
-
#release_lock(lock_uri) {|token| ... } ⇒ Object
Releases lock.
-
#retrieve_message(msg_buffer_name, timeout = DEFAULT_RECEIVE_TIMEOUT) {|token| ... } ⇒ Object
Retrieve the message in the message buffer.
-
#send_message(msg_buffer_name, msg_str, timeout = DEFAULT_SEND_TIMEOUT) {|token| ... } ⇒ Object
Sends given message to given message buffer.
Methods included from DotNetServices::HTTPRequests
#delete, #get, #post, #proxy, #put
Methods included from HTTPProxy
Constructor Details
#initialize(issuer_name, issuer_key, solution_name, service_name, proxy = {}) ⇒ MessageBuffer
Initializes the Mesasge Buffer object with following settings
-
issuer_name
-
issuer_key (issuer_secret or Management key)
-
solution_name
-
service_name
-
proxy - OPTIONAL hash with keys
-
‘http_web_proxy_server’
-
‘http_web_proxy_port’
-
‘http_web_proxy_username’
-
‘http_web_proxy_password’
-
69 70 71 72 73 74 75 76 77 78 |
# File 'lib/service_bus/message_buffer.rb', line 69 def initialize(issuer_name, issuer_key, solution_name, service_name, proxy = {}) @issuer_name = issuer_name @issuer_key = issuer_key @solution_name = solution_name @service_name = service_name @proxy = proxy set_http_web_proxy(proxy) @acm_host_name = DotNetServicesEnvironment.acm_host_name @service_bus_host_name = DotNetServicesEnvironment.service_bus_host_name end |
Instance Method Details
#create_message_buffer(msg_buffer_name, msg_buffer_policy_obj) {|token| ... } ⇒ Object
Creates a message buffer. Accepts following params
-
msg_buffer_name
-
msg_buffer_policy_obj (
MessageBufferPolicy
)
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 |
# File 'lib/service_bus/message_buffer.rb', line 83 def (msg_buffer_name, msg_buffer_policy_obj) raise 'Message Buffer Policy cannot be nil' unless msg_buffer_policy_obj msg_buffer_policy = msg_buffer_policy_obj. request_uri, msg_buffer_uri = msg_buffer_uris(msg_buffer_name) raise 'Message buffer URI not found' unless msg_buffer_uri token = (request_uri, msg_buffer_uri) yield(token) if block_given? begin = {} header = { 'Content-type' => MessageBufferConstants.content_type_property_for_atom_xml, 'Authorization' => (token), 'Expect' => HTTP_HEADER_EXPECT } .store(:header, header) response = put(msg_buffer_uri, msg_buffer_policy, ) if ACCEPTED_RESPONSE_CODES.include?(response.code) return response.body else raise("Unexpected response code #{response.code}") end rescue raise "Unexpected exception occured in create_message_buffer" end end |
#delete_locked_message(message_uri, lock_id) {|token| ... } ⇒ Object
Deletes a locked message. Expects following arguments obtained as part of the response of peek_lock-
-
Message Buffer Uri
-
Lock Id
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 |
# File 'lib/service_bus/message_buffer.rb', line 214 def (, lock_id) lock_id_parameter = MessageBufferConstants.lock_id_parameter token = (request_uri, ) yield(token) if block_given? delete_uri = + "?" + lock_id_parameter + "=" + lock_id = {} header = { 'Content-type' => MessageBufferConstants.content_type_property_for_atom_xml, 'Expect' => HTTP_HEADER_EXPECT, 'Authorization' => "#{MessageBufferConstants.shared_secret_header} #{CGI::unescape(token)}", 'Accept' => "*/*" } .store(:header,header) response = delete(delete_uri, ) if response.code == HTTP_STATUS_CODE_OK return response.body else raise "Unexpected response code #{response.code}" end end |
#delete_message_buffer(msg_buffer_name) {|token| ... } ⇒ Object
Deletes message buffer. Expects -
-
Message Buffer Name
137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/service_bus/message_buffer.rb', line 137 def (msg_buffer_name) request_uri, msg_buffer_uri = msg_buffer_uris(msg_buffer_name) token = (request_uri, msg_buffer_uri) yield(token) if block_given? = {} header = { 'Content-type' => MessageBufferConstants.content_type_property_for_atom_xml, 'Expect' => HTTP_HEADER_EXPECT, 'Authorization' => (token) } .store(:header, header) response = delete(msg_buffer_uri, ) if response.code == HTTP_STATUS_CODE_OK return response.body else raise "Unexpected response code #{response.code}" end end |
#peek_lock(msg_buffer_name, timeout = DEFAULT_RECEIVE_TIMEOUT, lock_duration = DEFAULT_LOCK_DURATION) {|token| ... } ⇒ Object
Makes a peek lock on given message buffer. Expects -
-
Message Buffer Name
-
Timeout(Default = DEFAULT_SEND_TIMEOUT)
-
Lock Duration(Default = DEFAULT_LOCK_DURATION)
183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/service_bus/message_buffer.rb', line 183 def peek_lock(msg_buffer_name, timeout = DEFAULT_RECEIVE_TIMEOUT, lock_duration = DEFAULT_LOCK_DURATION) retrieve_path = MessageBufferConstants. timeout_parameter = MessageBufferConstants.timeout_parameter lock_duration_parameter = MessageBufferConstants.lock_duration_parameter msg_buffer_uri = (msg_buffer_name, retrieve_path) token = (request_uri, msg_buffer_uri) yield(token) if block_given? retrieve_url = msg_buffer_uri + "?" + timeout_parameter + "=" + timeout + "&" + lock_duration_parameter + "=" + lock_duration = {} header = { 'Content-type' => MessageBufferConstants.content_type_property_for_text, 'Expect' => HTTP_HEADER_EXPECT, 'Authorization' => (token) , 'Accept' => "*/*" } .store(:header,header) response = post(retrieve_url, "", ) if ACCEPTED_RESPONSE_CODES.include?(response.code) return LockedMessageInfo.new( response['X-MS-LOCK-ID'], response['X-MS-LOCK-LOCATION'], response['X-MS-MESSAGE-LOCATION'] ) else raise "Unexpected response code #{response.code}" end end |
#policy(msg_buffer_name) {|token| ... } ⇒ Object
Fetches policy of the given message buffer. Expects -
-
Message Buffer Name
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/service_bus/message_buffer.rb', line 112 def policy(msg_buffer_name) request_uri, msg_buffer_uri = msg_buffer_uris(msg_buffer_name) token = (request_uri, msg_buffer_uri) yield(token) if block_given? = {} header = { 'Content-type' => MessageBufferConstants.content_type_property_for_atom_xml, 'Expect' => HTTP_HEADER_EXPECT, 'Authorization' => (token) } .store(:header, header) response = get(msg_buffer_uri, ) if response.code == HTTP_STATUS_CODE_OK body = response.body if body.match(/<entry xmlns=/) return body else raise 'Unexpected response body while trying to get the message buffer policy' end end end |
#release_lock(lock_uri) {|token| ... } ⇒ Object
Releases lock. Expects following arguments obtained as part of the response of peek_lock-
-
Lock Uri
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/service_bus/message_buffer.rb', line 237 def release_lock(lock_uri) token = (request_uri, lock_uri) yield(token) if block_given? = {} header = { 'Content-type' => MessageBufferConstants.content_type_property_for_text, 'Expect' => HTTP_HEADER_EXPECT, 'Authorization' => (token) , 'Accept' => "*/*" } .store(:header,header) response = delete(lock_uri,) if response.code == HTTP_STATUS_CODE_OK return response.body else raise "Unexpected response code #{response.code}" end end |
#retrieve_message(msg_buffer_name, timeout = DEFAULT_RECEIVE_TIMEOUT) {|token| ... } ⇒ Object
Retrieve the message in the message buffer. Expects -
-
Message Buffer Name
-
Timeout(Default = DEFAULT_SEND_TIMEOUT)
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/service_bus/message_buffer.rb', line 259 def (msg_buffer_name, timeout = DEFAULT_RECEIVE_TIMEOUT) retrieve_path = MessageBufferConstants. timeout_parameter = MessageBufferConstants.timeout_parameter msg_buffer_uri = (msg_buffer_name, retrieve_path) token = (request_uri, msg_buffer_uri) yield(token) if block_given? retrieve_url = msg_buffer_uri.to_s + "?" + timeout_parameter.to_s + "=" + timeout.to_s = {} header = { 'Content-type' => MessageBufferConstants.content_type_property_for_text, 'Authorization' => "#{MessageBufferConstants.shared_secret_header} #{CGI.unescape(token)}" } .store(:header,header) response = delete(retrieve_url,) if response.code == HTTP_STATUS_CODE_OK return response.body else raise "Unexpected response code #{response.code}" end end |
#send_message(msg_buffer_name, msg_str, timeout = DEFAULT_SEND_TIMEOUT) {|token| ... } ⇒ Object
Sends given message to given message buffer. Expects -
-
Message Buffer Name
-
Message body
-
timeout(Default = DEFAULT_SEND_TIMEOUT)
160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'lib/service_bus/message_buffer.rb', line 160 def (msg_buffer_name, msg_str, timeout = DEFAULT_SEND_TIMEOUT) send_path = MessageBufferConstants. timeout_parameter = MessageBufferConstants.timeout_parameter msg_buffer_uri = (msg_buffer_name, send_path) = (msg_str) token = (request_uri, msg_buffer_uri) yield(token) if block_given? send_uri = msg_buffer_uri + "?" + timeout_parameter + "=" + timeout = {} header = { 'Content-type' => MessageBufferConstants.content_type_property_for_text, 'Expect' => HTTP_HEADER_EXPECT, 'Authorization' => (token) } .store(:header, header) response = post(send_uri, , ) ACCEPTED_RESPONSE_CODES.include?(response.code) || raise("Unexpected response code #{response.code}") end |