Class: Mqlight::BlockingClient
- Inherits:
-
Object
- Object
- Mqlight::BlockingClient
- Includes:
- Logging
- Defined in:
- lib/mqlight/blocking_client.rb
Overview
this class uses timeouts in milliseconds with zero meaning: “don’t wait at all” and nil meaning “wait forever - don’t time out”.
The MQ Light client. This can be used to exchange messages between the MQ AMQP Channel or MQ Light server. This version of the client blocks the calling thread while carrying out messaging operations.
Instance Attribute Summary collapse
-
#id ⇒ String
readonly
The client id, which can either be explicitly specified when the client is created or automatically generated.
Instance Method Summary collapse
-
#initialize(service, options = {}) {|state, reason| ... } ⇒ BlockingClient
constructor
Creates a new instance of the client.
-
#receive(topic_pattern, options = {}) ⇒ Delivery?
Receive a message from a destination, as identified by the topic pattern used to subscribe to the destination.
-
#retrying? ⇒ Boolean
True indicating if the client is in the retrying status.
-
#send(topic, data, options = {}) ⇒ BlockingClient
Sends a message to the specified topic, blocking the calling thread while the send operation takes place (or until the timeout value, as specified via the timeout option is exceeded).
-
#service ⇒ nil, String
Either the URL of the service that the client is currently connect to, or nil if the client is not currently connected to a service.
-
#start(_options = {}) ⇒ BlockingClient
Requests that the client transition into started state.
-
#started? ⇒ Boolean
True indicating if the client is in the started status.
-
#starting? ⇒ Boolean
True indicating if the client is in the starting status.
-
#state ⇒ Symbol
The current state of the client.
-
#stop(options = {}) ⇒ Object
Requests that the client transition into stopped state.
-
#stopped? ⇒ Boolean
True indicating if the client is in the stopped status.
-
#subscribe(topic_pattern, options = {}) ⇒ Object
Subscribes to receive messages from a destination, identified by the topic pattern argument.
-
#to_s ⇒ String
Client Id.
-
#unsubscribe(topic_pattern, options = {}) ⇒ Object
Unsubscribes from a destination.
Methods included from Logging
Constructor Details
#initialize(service, options = {}) {|state, reason| ... } ⇒ BlockingClient
Creates a new instance of the client. The client will be created in starting state. The constructor will make a connection attempt to the server and report failures (such as “not authorised”) as exceptions. This means that in the golden path case the constructor will return an instance of the BlockingClient that is in started state. A code block, yielded to by the constructor can be used to register a listener that receives notifications when the associated client changes state.
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/mqlight/blocking_client.rb', line 127 def initialize(service, = {}, &state_callback) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } @id = .fetch(:id, nil) @user = .fetch(:user, nil) @password = .fetch(:password, nil) # Validate id fail ArgumentError, 'Client identifier must be a String.' unless @id.is_a?(String) || @id.nil? set_defaults # Create the variables to share between the threads. @thread_vars = Mqlight::ThreadVars.new(@id) # Validate id some more fail ArgumentError, "Client identifier '#{@id}' is longer than the "\ 'maximum ID length of 256.' if @id.length > 256 # currently client ids are restricted, reject any invalid ones invalid_client_id_pattern = %r{[^A-Za-z0-9%\/\._]+} invalid_client_id_pattern.match(@id) do |m| fail ArgumentError, "Client Identifier '#{@id}' contains invalid "\ "char: #{m[0]}" end # Validate username and password fail ArgumentError, 'Both user and password properties must '\ 'be specified together.' if (@user && !@password) || (!@user && @password) if @user && @password fail ArgumentError, 'Both user and password must be Strings.' unless (@user.is_a? String) && (@password.is_a? String) end # pre-validate service param is a well-formed URI Util.validate_services(service, @user, @password) @thread_vars.state_callback = state_callback # Setup queue for sharing with proton thread @proton_queue = Queue.new @proton_queue_mutex = Mutex.new @proton_queue_resource = ConditionVariable.new args = { options: , id: @id, user: @user, password: @password, service: service, thread_vars: @thread_vars, } @command = Mqlight::Command.new(args) @connection = Mqlight::Connection.new(args) logger.data(@id, 'Client created. Starting...') do self.class.to_s + '#' + __method__.to_s end start logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |
Instance Attribute Details
#id ⇒ String (readonly)
Returns the client id, which can either be explicitly specified when the client is created or automatically generated.
39 40 41 |
# File 'lib/mqlight/blocking_client.rb', line 39 def id @id end |
Instance Method Details
#receive(topic_pattern, options = {}) ⇒ Delivery?
Receive a message from a destination, as identified by the topic pattern used to subscribe to the destination.
528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 |
# File 'lib/mqlight/blocking_client.rb', line 528 def receive(topic_pattern, = {}) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } fail Mqlight::StoppedError, 'Not started.' if stopped? # Validate topic_pattern fail ArgumentError, 'topic_pattern must be a String.' unless topic_pattern.is_a? String # Validate options fail ArgumentError, 'options must be a Hash.' unless .is_a?(Hash) || .nil? timeout = .fetch(:timeout, nil) if .is_a? Hash unless timeout.nil? fail ArgumentError, 'timeout must be nil or an unsigned Integer' unless timeout.is_a? Integer fail RangeError, 'timeout must be an unsigned Integer' if timeout < 0 timeout /= 1000.0 # minimum timeout is 10 milliseconds. This is a mimimum practical. timeout = 0.010 if timeout == 0 end share = .fetch(:share, nil) fail ArgumentError, 'share must be a String or nil.' unless share.is_a?(String) || share.nil? if share.is_a? String fail ArgumentError, 'share is invalid because it contains a colon (:) character' if share.include? ':' end logger.data(@id, 'Checking for a matching destination') do self.class.to_s + '#' + __method__.to_s end destination = @thread_vars.destinations.find do |dest| dest.match?(topic_pattern, share) end # Has a matching destination has been found? if destination.nil? fail Mqlight::UnsubscribedError, 'You must be subscribed with '\ "topic_pattern #{topic_pattern} to receive messages from it." \ if share.nil? fail Mqlight::UnsubscribedError, 'You must be subscribed with '\ "topic_pattern #{topic_pattern} and share #{share} to receive"\ 'messages from it.' end @command.push_request(action: 'receive', timeout: timeout, destination: destination) # Get the message or nil for timeout to return = @thread_vars.reply_queue.pop # If the reply is an exception and that exception is # exception = timeout set message to nil to indicate timeout no message # otherwise raise the exception if .is_a? Mqlight::ExceptionContainer if .exception.is_a? Mqlight::TimeoutError = nil else fail .exception end end logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |
#retrying? ⇒ Boolean
Returns true indicating if the client is in the retrying status.
702 703 704 |
# File 'lib/mqlight/blocking_client.rb', line 702 def @thread_vars.state == :retrying end |
#send(topic, data, options = {}) ⇒ BlockingClient
Sends a message to the specified topic, blocking the calling thread while the send operation takes place (or until the timeout value, as specified via the timeout option is exceeded).
-
For “at most once” quality of service messages (qos option set to 0), the calling thread will block until the client is both successfully network connected and the message has been buffered by the client. This method may or may not block until the data has been flushed to the underlying network, at the discretion of the client implementation, which balances throughput against buffering large amounts of data.
-
For “at least once” quality of service messages (qos option set to 1), the calling thread will block until the client is both successfully network connected and has received confirmation from the server that the server has received a copy of the message.
355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
# File 'lib/mqlight/blocking_client.rb', line 355 def send(topic, data, = {}) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } fail Mqlight::StoppedError, 'Not started.' if stopped? fail ArgumentError, 'topic must be a String' unless topic.is_a? String fail Mqlight::UnsupportedError, "#{data.class.name.split('::').last} "\ 'is not yet supported as a message data type' unless data.is_a? String if .is_a? Hash qos = .fetch(:qos, nil) ttl = .fetch(:ttl, nil) timeout = .fetch(:timeout, nil) else fail ArgumentError, 'options must be a Hash.' unless .nil? end qos ||= QOS_AT_MOST_ONCE @thread_vars.proton.settle_mode = qos unless ttl.nil? fail ArgumentError, "options:ttl value '" + ttl.to_s + "' is invalid, must be an unsigned non-zero integer number" \ unless ttl.is_a?(Integer) && ttl > 0 ttl = 4_294_967_295 if ttl > 4_294_967_295 end if timeout fail ArgumentError, 'timeout must be nil or a unsigned Integer' if (!timeout.is_a? Integer) || (timeout < 0) timeout /= 1000.0 end # Setup the message msg = Qpid::Proton::Message.new # URI escape anything apart from path separators (/) and all known # unreserved characters msg.address = @thread_vars.service.address + '/' + topic msg.ttl = ttl if ttl msg.body = data if data.encoding == Encoding::BINARY msg.content_type = 'application/octet-stream' else begin JSON.parse(data) msg.content_type = 'application/json' rescue JSON::ParserError msg.content_type = 'text/plain' end end msg.pre_encode # Clear the return queue @thread_vars.reply_queue.clear begin @command.push_request(action: 'send', params: msg.impl, qos: qos, timeout: timeout) # Collect the reply reply = @thread_vars.reply_queue.pop fail reply unless reply.nil? logger.exit(@id, self) { self.class.to_s + '#' + __method__.to_s } self rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end end |
#service ⇒ nil, String
Returns either the URL of the service that the client is currently connect to, or nil if the client is not currently connected to a service.
676 677 678 |
# File 'lib/mqlight/blocking_client.rb', line 676 def service @thread_vars.service.service if started? end |
#start(_options = {}) ⇒ BlockingClient
Requests that the client transition into started state. This method will block the calling thread until the client has either:
-
Attained started state (effectively being a no-op if the client is already in started state)
-
Attained stopped state (most likely due to another thread calling the stop method before the client manages to attain started state).
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# File 'lib/mqlight/blocking_client.rb', line 224 def start( = {}) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } \ unless stopped? return unless stopped? # TODO: missing exit trace @thread_vars.change_state(:starting) # Try each service in turn logger.data(@id, 'Trying each service in turn') do self.class.to_s + '#' + __method__.to_s end # New connection; increment count @thread_vars.reconnected # Start the command thread @command.start_thread # Proton handle thread. @connection.start_thread @callback_thread = Thread.new do Thread.current['name'] = 'callback_thread' callback_loop until stopped? && @thread_vars.callback_queue.empty? end logger.data(@id, 'Waiting for state change') do self.class.to_s + '#' + __method__.to_s end # Block until the state changes sleep(0.1) until || started? || stopped? fail @thread_vars.last_state_error if stopped? logger.exit(@id, self) { self.class.to_s + '#' + __method__.to_s } self rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |
#started? ⇒ Boolean
Returns true indicating if the client is in the started status.
692 693 694 |
# File 'lib/mqlight/blocking_client.rb', line 692 def started? @thread_vars.state == :started end |
#starting? ⇒ Boolean
Returns true indicating if the client is in the starting status.
707 708 709 |
# File 'lib/mqlight/blocking_client.rb', line 707 def starting? @thread_vars.state == :starting end |
#state ⇒ Symbol
Returns the current state of the client. This will be one of: :starting, :started, :stopping, :stopped, :retrying, or :restarted.
682 683 684 |
# File 'lib/mqlight/blocking_client.rb', line 682 def state @thread_vars.state end |
#stop(options = {}) ⇒ Object
Requests that the client transition into stopped state. This method will block the calling thread until the client has attained stopped state.
281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/mqlight/blocking_client.rb', line 281 def stop( = {}) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } unless stopped? if started? @thread_vars.change_state(:stopping) @thread_vars.proton.stop end @thread_vars.change_state(:stopped) @thread_vars.subscriptions_clear @connection.wakeup @connection.stop_thread @command.join end logger.exit(@id) { self.class.to_s + '#' + __method__.to_s } rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |
#stopped? ⇒ Boolean
Returns true indicating if the client is in the stopped status.
697 698 699 |
# File 'lib/mqlight/blocking_client.rb', line 697 def stopped? @thread_vars.state == :stopped end |
#subscribe(topic_pattern, options = {}) ⇒ Object
Subscribes to receive messages from a destination, identified by the topic pattern argument. The receive(…) method can then be used to retrieve messages, held at the server, for the destination. The client cannot be in stopped or stopping state when this method is called, otherwise a StoppedError will be raised.
474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 |
# File 'lib/mqlight/blocking_client.rb', line 474 def subscribe(topic_pattern, = {}) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } fail Mqlight::StoppedError, 'Not started.' if stopped? destination = Mqlight::Destination.new(@thread_vars.service, topic_pattern, ) @thread_vars.proton.settle_mode = destination.qos timeout = .nil? ? nil : .fetch(:timeout, nil) @command.push_request(action: 'subscribe', params: destination, timeout: timeout) # Collect status and throw exception is present reply = @thread_vars.reply_queue.pop fail reply unless reply.nil? logger.exit(@id, self) { self.class.to_s + '#' + __method__.to_s } self rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |
#to_s ⇒ String
Returns client Id.
687 688 689 |
# File 'lib/mqlight/blocking_client.rb', line 687 def to_s "#{@id}" end |
#unsubscribe(topic_pattern, options = {}) ⇒ Object
Unsubscribes from a destination. The client will no longer be able to receive messages from the destination. If another thread is using the receive() methods to retrieve messages from the destination that is being unsubscribed from then the receive() method will return immediately raising an UnsubscribedError.
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 |
# File 'lib/mqlight/blocking_client.rb', line 626 def unsubscribe(topic_pattern, = {}) logger.entry(@id) { self.class.to_s + '#' + __method__.to_s } parms = Hash[method(__method__).parameters.map do |parm| [parm[1], eval(parm[1].to_s)] end] logger.parms(@id, parms) { self.class.to_s + '#' + __method__.to_s } fail Mqlight::StoppedError, 'Not started' unless started? fail ArgumentError, 'topic_pattern must be a String' unless topic_pattern.is_a? String @topic_pattern = topic_pattern share = [:share] fail ArgumentError, 'share must be a String or nil.' unless share.is_a?(String) || share.nil? if share.is_a? String fail ArgumentError, 'share is invalid because it contains a colon (:) character' if share.include? ':' end ttl = [:ttl] fail ArgumentError, 'ttl value can only be 0' unless ttl.nil? || ttl == 0 logger.data(@id, 'Checking for a matching destination') do self.class.to_s + '#' + __method__.to_s end destination = @thread_vars.destinations.find do |dest| dest.match? topic_pattern, share end fail Mqlight::UnsubscribedError, 'client is not subscribed to this address and share' if destination.nil? && !share.nil? fail Mqlight::UnsubscribedError, 'client is not subscribed to this address' if destination.nil? @command.push_request(action: 'unsubscribe', params: destination, ttl: ttl) @thread_vars.destinations.delete(destination) logger.exit(@id, self) { self.class.to_s + '#' + __method__.to_s } self rescue StandardError => e logger.throw(@id, e) { self.class.to_s + '#' + __method__.to_s } raise e end |