Class: Klomp::Connection
- Inherits:
-
Object
- Object
- Klomp::Connection
- Defined in:
- lib/klomp/connection.rb
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#subscriptions ⇒ Object
readonly
Returns the value of attribute subscriptions.
Instance Method Summary collapse
- #closed? ⇒ Boolean
- #connected? ⇒ Boolean
- #disconnect ⇒ Object
-
#initialize(server, options = {}) ⇒ Connection
constructor
A new instance of Connection.
- #publish(queue, body, headers = {}) ⇒ Object
- #reconnect ⇒ Object
- #subscribe(queue, subscriber = nil, headers = {}, &block) ⇒ Object
- #unsubscribe(queue, headers = {}) ⇒ Object
Constructor Details
#initialize(server, options = {}) ⇒ Connection
Returns a new instance of Connection.
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/klomp/connection.rb', line 10 def initialize(server, ={}) @options = if server =~ /^stomp:\/\// uri = URI.parse server host, port = uri.host, uri.port @options['login'] = uri.user if uri.user @options['passcode'] = uri.password if uri.password if uri.query && !uri.query.empty? uri.query.split('&').each {|pair| k, v = pair.split('=', 2); @options[k] = v } end else address = server.split ':' port, host = address.pop.to_i, address.pop @options['host'] ||= address.pop unless address.empty? end @options['server'] = [host, port] @options['host'] ||= host @subscriptions = {} @logger = ['logger'] @select_timeout = ['select_timeout'] || 0.5 connect end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
8 9 10 |
# File 'lib/klomp/connection.rb', line 8 def logger @logger end |
#options ⇒ Object (readonly)
Returns the value of attribute options.
8 9 10 |
# File 'lib/klomp/connection.rb', line 8 def @options end |
#subscriptions ⇒ Object (readonly)
Returns the value of attribute subscriptions.
8 9 10 |
# File 'lib/klomp/connection.rb', line 8 def subscriptions @subscriptions end |
Instance Method Details
#closed? ⇒ Boolean
65 |
# File 'lib/klomp/connection.rb', line 65 def closed?() @closing && @socket.nil? end |
#connected? ⇒ Boolean
64 |
# File 'lib/klomp/connection.rb', line 64 def connected?() @socket end |
#disconnect ⇒ Object
67 68 69 70 71 72 73 74 75 |
# File 'lib/klomp/connection.rb', line 67 def disconnect close! stop_subscriber_thread frame = Frames::Disconnect.new write frame rescue nil @socket.close rescue nil @socket = nil frame end |
#publish(queue, body, headers = {}) ⇒ Object
35 36 37 |
# File 'lib/klomp/connection.rb', line 35 def publish(queue, body, headers = {}) write Frames::Send.new(queue, body, headers) end |
#reconnect ⇒ Object
77 78 79 80 81 82 83 84 85 86 87 88 89 |
# File 'lib/klomp/connection.rb', line 77 def reconnect return if connected? logger.warn "reconnect server=#{['server'].join(':')}" if logger connect subs = subscriptions.dup subscriptions.clear subs.each do |queue, subscription| subscribe(queue, subscription.subscriber, subscription.headers) end @sentinel = nil ensure @subscriptions = subs if subs && subs.size != @subscriptions.size end |
#subscribe(queue, subscriber = nil, headers = {}, &block) ⇒ Object
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/klomp/connection.rb', line 39 def subscribe(queue, subscriber = nil, headers = {}, &block) if subscriber.is_a?(Hash) headers = subscriber subscriber = nil end raise Klomp::Error, "no subscriber provided" unless subscriber || block raise Klomp::Error, "subscriber does not respond to #call" if subscriber && !subscriber.respond_to?(:call) previous = subscriptions[queue] subscriptions[queue] = Subscription.new(subscriber || block, headers) frame = Frames::Subscribe.new(queue, headers) if previous frame.previous_subscriber = previous.subscriber else write frame end start_subscriber_thread frame end |
#unsubscribe(queue, headers = {}) ⇒ Object
59 60 61 62 |
# File 'lib/klomp/connection.rb', line 59 def unsubscribe(queue, headers = {}) queue = queue.headers['destination'] if Frames::Subscribe === queue write Frames::Unsubscribe.new(queue, headers) if subscriptions.delete queue end |