Class: Stomp::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp/connection.rb

Overview

Low level connection which maps commands and supports synchronous receives

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) ⇒ Connection

Stomp URL :

A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

stomp://host:port
stomp://host.domain.tld:port
stomp://user:pass@host:port
stomp://user:[email protected]:port


28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/stomp/connection.rb', line 28

def initialize( = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  @host = host
  @port = port
  @login = 
  @passcode = passcode
  @transmit_semaphore = Mutex.new
  @read_semaphore = Mutex.new
  @socket_semaphore = Mutex.new
  @reliable = reliable
  @reconnect_delay = reconnect_delay
  @connect_headers = connect_headers
  @closed = false
  @subscriptions = {}
  @failure = nil
  socket
end

Class Method Details

.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) ⇒ Object

Syntactic sugar for ‘Connection.new’ See ‘initialize’ for usage.



46
47
48
# File 'lib/stomp/connection.rb', line 46

def Connection.open( = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
  Connection.new(, passcode, host, port, reliable, reconnect_delay, connect_headers)
end

Instance Method Details

#__old_receiveObject

Receive a frame, block until the frame is received



161
162
163
164
165
166
167
168
169
170
171
172
173
# File 'lib/stomp/connection.rb', line 161

def __old_receive
  # The recive my fail so we may need to retry.
  while TRUE
    begin
      s = socket
      return _receive(s)
    rescue
      @failure = $!;
      raise unless @reliable
      $stderr.print "receive failed: " + $!;
    end
  end
end

#abort(name, headers = {}) ⇒ Object

Abort a transaction by name



110
111
112
113
# File 'lib/stomp/connection.rb', line 110

def abort(name, headers = {})
  headers[:transaction] = name
  transmit("ABORT", headers)
end

#ack(message_id, headers = {}) ⇒ Object

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe “/queue/a”, :ack => ‘client’g

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )



98
99
100
101
# File 'lib/stomp/connection.rb', line 98

def ack(message_id, headers = {})
  headers['message-id'] = message_id
  transmit("ACK", headers)
end

#begin(name, headers = {}) ⇒ Object

Begin a transaction, requires a name for the transaction



89
90
91
92
# File 'lib/stomp/connection.rb', line 89

def begin(name, headers = {})
  headers[:transaction] = name
  transmit("BEGIN", headers)
end

#closed?Boolean

Is this connection closed?

Returns:

  • (Boolean)


84
85
86
# File 'lib/stomp/connection.rb', line 84

def closed?
  @closed
end

#commit(name, headers = {}) ⇒ Object

Commit a transaction by name



104
105
106
107
# File 'lib/stomp/connection.rb', line 104

def commit(name, headers = {})
  headers[:transaction] = name
  transmit("COMMIT", headers)
end

#disconnect(headers = {}) ⇒ Object

Close this connection



146
147
148
149
# File 'lib/stomp/connection.rb', line 146

def disconnect(headers = {})
  transmit("DISCONNECT", headers)
  @closed = true
end

#open?Boolean

Is this connection open?

Returns:

  • (Boolean)


79
80
81
# File 'lib/stomp/connection.rb', line 79

def open?
  !@closed
end

#pollObject

Return a pending message if one is available, otherwise return nil



153
154
155
156
157
158
# File 'lib/stomp/connection.rb', line 153

def poll
  @read_semaphore.synchronize do
    return nil if @socket.nil? || !@socket.ready?
    return receive
  end
end

#receiveObject



175
176
177
178
179
180
181
182
183
# File 'lib/stomp/connection.rb', line 175

def receive
  super_result = __old_receive()
  if super_result.nil? && @reliable
    $stderr.print "connection.receive returning EOF as nil - resetting connection.\n"
    @socket = nil
    super_result = __old_receive()
  end
  return super_result
end

#send(destination, message, headers = {}) ⇒ Object

Send message to destination

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )



140
141
142
143
# File 'lib/stomp/connection.rb', line 140

def send(destination, message, headers = {})
  headers[:destination] = destination
  transmit("SEND", headers, message)
end

#socketObject



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/stomp/connection.rb', line 50

def socket
  # Need to look into why the following synchronize does not work.
  #@read_semaphore.synchronize do
    s = @socket;
    while s.nil? || !@failure.nil?
      @failure = nil
      begin
        s = TCPSocket.open @host, @port
  	    headers = @connect_headers.clone
  	    headers[:login] = @login
  	    headers[:passcode] = @passcode
        _transmit(s, "CONNECT", headers)
        @connect = _receive(s)
        # replay any subscriptions.
        @subscriptions.each { |k,v| _transmit(s, "SUBSCRIBE", v) }
      rescue
        @failure = $!;
        s=nil;
        raise unless @reliable
        $stderr.print "connect failed: " + $! +" will retry in #{@reconnect_delay}\n";
        sleep(@reconnect_delay);
      end
    end
    @socket = s
    return s;
  #end
end

#subscribe(name, headers = {}, subId = nil) ⇒ Object

Subscribe to a destination, must specify a name



116
117
118
119
120
121
122
123
124
125
# File 'lib/stomp/connection.rb', line 116

def subscribe(name, headers = {}, subId = nil)
  headers[:destination] = name
  transmit("SUBSCRIBE", headers)

  # Store the sub so that we can replay if we reconnect.
  if @reliable
    subId = name if subId.nil?
    @subscriptions[subId] = headers
  end
end

#unsubscribe(name, headers = {}, subId = nil) ⇒ Object

Unsubscribe from a destination, must specify a name



128
129
130
131
132
133
134
135
# File 'lib/stomp/connection.rb', line 128

def unsubscribe(name, headers = {}, subId = nil)
  headers[:destination] = name
  transmit("UNSUBSCRIBE", headers)
  if @reliable
    subId = name if subId.nil?
    @subscriptions.delete(subId)
  end
end