Class: Klomp::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/klomp/connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ Connection

Returns a new instance of Connection.



10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/klomp/connection.rb', line 10

def initialize(server, options={})
  @options = options

  if server =~ /^stomp:\/\//
    uri                  = URI.parse server
    host, port           = uri.host, uri.port
    @options['login']    = uri.user if uri.user
    @options['passcode'] = uri.password if uri.password
    if uri.query && !uri.query.empty?
      uri.query.split('&').each {|pair| k, v = pair.split('=', 2); @options[k] = v }
    end
  else
    address            = server.split ':'
    port, host         = address.pop.to_i, address.pop
    @options['host'] ||= address.pop unless address.empty?
  end

  @options['server']   = [host, port]
  @options['host']   ||= host
  @subscriptions = {}
  @logger = options['logger']
  @select_timeout = options['select_timeout'] || 0.5
  connect
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



8
9
10
# File 'lib/klomp/connection.rb', line 8

def logger
  @logger
end

#optionsObject (readonly)

Returns the value of attribute options.



8
9
10
# File 'lib/klomp/connection.rb', line 8

def options
  @options
end

#subscriptionsObject (readonly)

Returns the value of attribute subscriptions.



8
9
10
# File 'lib/klomp/connection.rb', line 8

def subscriptions
  @subscriptions
end

Instance Method Details

#closed?Boolean

Returns:

  • (Boolean)


65
# File 'lib/klomp/connection.rb', line 65

def closed?()       @closing && @socket.nil? end

#connected?Boolean

Returns:

  • (Boolean)


64
# File 'lib/klomp/connection.rb', line 64

def connected?()    @socket end

#disconnectObject



67
68
69
70
71
72
73
74
75
# File 'lib/klomp/connection.rb', line 67

def disconnect
  close!
  stop_subscriber_thread
  frame = Frames::Disconnect.new
  write frame rescue nil
  @socket.close rescue nil
  @socket = nil
  frame
end

#publish(queue, body, headers = {}) ⇒ Object



35
36
37
# File 'lib/klomp/connection.rb', line 35

def publish(queue, body, headers = {})
  write Frames::Send.new(queue, body, headers)
end

#reconnectObject



77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/klomp/connection.rb', line 77

def reconnect
  return if connected?
  logger.warn "reconnect server=#{options['server'].join(':')}" if logger
  connect
  subs = subscriptions.dup
  subscriptions.clear
  subs.each do |queue, subscription|
    subscribe(queue, subscription.subscriber, subscription.headers)
  end
  @sentinel = nil
ensure
  @subscriptions = subs if subs && subs.size != @subscriptions.size
end

#subscribe(queue, subscriber = nil, headers = {}, &block) ⇒ Object

Raises:



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/klomp/connection.rb', line 39

def subscribe(queue, subscriber = nil, headers = {}, &block)
  if subscriber.is_a?(Hash)
    headers = subscriber
    subscriber = nil
  end

  raise Klomp::Error, "no subscriber provided" unless subscriber || block
  raise Klomp::Error, "subscriber does not respond to #call" if subscriber && !subscriber.respond_to?(:call)
  previous = subscriptions[queue]
  subscriptions[queue] = Subscription.new(subscriber || block, headers)
  frame = Frames::Subscribe.new(queue, headers)
  if previous
    frame.previous_subscriber = previous.subscriber
  else
    write frame
  end
  start_subscriber_thread
  frame
end

#unsubscribe(queue, headers = {}) ⇒ Object



59
60
61
62
# File 'lib/klomp/connection.rb', line 59

def unsubscribe(queue, headers = {})
  queue = queue.headers['destination'] if Frames::Subscribe === queue
  write Frames::Unsubscribe.new(queue, headers) if subscriptions.delete queue
end