Class: Stomp::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/stomp/client.rb

Overview

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) ⇒ Client

A new Client object can be initialized using two forms:

Standard positional parameters:

login     (String,  default : '')
passcode  (String,  default : '')
host      (String,  default : 'localhost')
port      (Integer, default : 61613)
reliable  (Boolean, default : false)

e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true)

Stomp URL :

A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

stomp://host:port
stomp://host.domain.tld:port
stomp://login:passcode@host:port
stomp://login:[email protected]:port

Raises:

  • (ArgumentError)


31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# File 'lib/stomp/client.rb', line 31

def initialize( = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})

  # Parse stomp:// URL's or set positional params
  case 
  when /stomp:\/\/([\w\.]+):(\d+)/ # e.g. stomp://host:port
    # grabs the matching positions out of the regex which are stored as
    # $1 (host), $2 (port), etc
    @login = ''
    @passcode = ''
    @host = $1
    @port = $2.to_i
    @reliable = false
  when /stomp:\/\/([\w\.]+):(\w+)@([\w\.]+):(\d+)/ # e.g. stomp://login:passcode@host:port
    @login = $1
    @passcode = $2
    @host = $3
    @port = $4.to_i
    @reliable = false
  else
    @login = 
    @passcode = passcode
    @host = host
    @port = port.to_i
    @reliable = reliable
  end

  raise ArgumentError if @host.nil? || @host.empty?
  raise ArgumentError if @port.nil? || @port == '' || @port < 1 || @port > 65535
  raise ArgumentError unless @reliable.is_a?(TrueClass) || @reliable.is_a?(FalseClass)

  @id_mutex = Mutex.new
  @ids = 1
  @connection = Connection.new(@login, @passcode, @host, @port, @reliable, reconnect_delay, connect_headers)
  @listeners = {}
  @receipt_listeners = {}
  @running = true
  @replay_messages_by_txn = {}

  @listener_thread = Thread.start do
    while @running
      message = @connection.receive
      case
      when message.nil?
        break
      when message.command == 'MESSAGE'
        if listener = @listeners[message.headers['destination']]
          listener.call(message)
        end
      when message.command == 'RECEIPT'
        if listener = @receipt_listeners[message.headers['receipt-id']]
          listener.call(message)
        end
      end
    end
  end

end

Instance Attribute Details

#hostObject (readonly)

Returns the value of attribute host.



10
11
12
# File 'lib/stomp/client.rb', line 10

def host
  @host
end

#loginObject (readonly)

Returns the value of attribute login.



10
11
12
# File 'lib/stomp/client.rb', line 10

def 
  @login
end

#passcodeObject (readonly)

Returns the value of attribute passcode.



10
11
12
# File 'lib/stomp/client.rb', line 10

def passcode
  @passcode
end

#portObject (readonly)

Returns the value of attribute port.



10
11
12
# File 'lib/stomp/client.rb', line 10

def port
  @port
end

#reliableObject (readonly)

Returns the value of attribute reliable.



10
11
12
# File 'lib/stomp/client.rb', line 10

def reliable
  @reliable
end

#runningObject (readonly)

Returns the value of attribute running.



10
11
12
# File 'lib/stomp/client.rb', line 10

def running
  @running
end

Class Method Details

.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) ⇒ Object

Syntactic sugar for ‘Client.new’ See ‘initialize’ for usage.



90
91
92
# File 'lib/stomp/client.rb', line 90

def self.open( = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
  Client.new(, passcode, host, port, reliable)
end

Instance Method Details

#abort(name, headers = {}) ⇒ Object

Abort a transaction by name



106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/stomp/client.rb', line 106

def abort(name, headers = {})
  @connection.abort(name, headers)

  # lets replay any ack'd messages in this transaction
  replay_list = @replay_messages_by_txn[name]
  if replay_list
    replay_list.each do |message|
      if listener = @listeners[message.headers['destination']]
        listener.call(message)
      end
    end
  end
end

#acknowledge(message, headers = {}) ⇒ Object

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe “/queue/a”, :ack => ‘client’g

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )



147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/stomp/client.rb', line 147

def acknowledge(message, headers = {})
  txn_id = headers[:transaction]
  if txn_id
    # lets keep around messages ack'd in this transaction in case we rollback
    replay_list = @replay_messages_by_txn[txn_id]
    if replay_list.nil?
      replay_list = []
      @replay_messages_by_txn[txn_id] = replay_list
    end
    replay_list << message
  end
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  @connection.ack message.headers['message-id'], headers
end

#begin(name, headers = {}) ⇒ Object

Begin a transaction by name



101
102
103
# File 'lib/stomp/client.rb', line 101

def begin(name, headers = {})
  @connection.begin(name, headers)
end

#closeObject

Close out resources in use by this client



188
189
190
191
# File 'lib/stomp/client.rb', line 188

def close
  @connection.disconnect
  @running = false
end

#closed?Boolean

Is this client closed?

Returns:

  • (Boolean)


183
184
185
# File 'lib/stomp/client.rb', line 183

def closed?
  @connection.closed?
end

#commit(name, headers = {}) ⇒ Object

Commit a transaction by name



121
122
123
124
125
# File 'lib/stomp/client.rb', line 121

def commit(name, headers = {})
  txn_id = headers[:transaction]
  @replay_messages_by_txn.delete(txn_id)
  @connection.commit(name, headers)
end

#joinObject

Join the listener thread for this client, generally used to wait for a quit signal



96
97
98
# File 'lib/stomp/client.rb', line 96

def join
  @listener_thread.join
end

#open?Boolean

Is this client open?

Returns:

  • (Boolean)


178
179
180
# File 'lib/stomp/client.rb', line 178

def open?
  @connection.open?
end

#send(destination, message, headers = {}) ⇒ Object

Send message to destination

If a block is given a receipt will be requested and passed to the block on receipt

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )



170
171
172
173
174
175
# File 'lib/stomp/client.rb', line 170

def send(destination, message, headers = {})
  if block_given?
    headers['receipt'] = register_receipt_listener lambda {|r| yield r}
  end
  @connection.send(destination, message, headers)
end

#subscribe(destination, headers = {}) ⇒ Object

Subscribe to a destination, must be passed a block which will be used as a callback listener

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )



131
132
133
134
135
# File 'lib/stomp/client.rb', line 131

def subscribe(destination, headers = {})
  raise "No listener given" unless block_given?
  @listeners[destination] = lambda {|msg| yield msg}
  @connection.subscribe(destination, headers)
end

#unsubscribe(name, headers = {}) ⇒ Object

Unsubecribe from a channel



138
139
140
141
# File 'lib/stomp/client.rb', line 138

def unsubscribe(name, headers = {})
  @connection.unsubscribe(name, headers)
  @listeners[name] = nil
end