Class: Bunny::Client09
- Inherits:
-
Qrack::Client
- Object
- Qrack::Client
- Bunny::Client09
- Defined in:
- lib/bunny/client09.rb
Overview
The Client class provides the major Bunny API methods.
Constant Summary
Constants inherited from Qrack::Client
Qrack::Client::CONNECT_TIMEOUT, Qrack::Client::RETRY_DELAY
Instance Attribute Summary
Attributes inherited from Qrack::Client
#__opts__, #channel, #channels, #connecting, #exchanges, #heartbeat, #host, #logfile, #logging, #message_in, #message_out, #port, #queues, #spec, #status, #vhost
Instance Method Summary collapse
-
#check_response(received_method, expected_method, err_msg, err_class = Bunny::ProtocolError) ⇒ Object
Checks response from AMQP methods and takes appropriate action.
- #close_connection ⇒ Object
- #create_channel ⇒ Object
-
#exchange(name, opts = {}) ⇒ Bunny::Exchange09
Declares an exchange to the broker/server.
- #init_connection ⇒ Object
-
#initialize(connection_string_or_opts = Hash.new, opts = Hash.new) ⇒ Client09
constructor
Sets up a Bunny::Client object ready for connection to a broker.
- #next_frame(opts = {}) ⇒ Object
- #open_connection ⇒ Object
-
#qos(opts = {}) ⇒ Symbol
Requests a specific quality of service.
-
#queue(name = nil, opts = {}) ⇒ Bunny::Queue09
Declares a queue to the broker/server.
-
#recover(opts = {}) ⇒ Object
Asks the broker to redeliver all unacknowledged messages on a specified channel.
- #send_frame(*args) ⇒ Object
- #send_heartbeat ⇒ Object
-
#start_session ⇒ Symbol
(also: #start)
Opens a communication channel and starts a connection.
-
#tx_commit ⇒ Symbol
This method commits all messages published and acknowledged in the current transaction.
-
#tx_rollback ⇒ Symbol
This method abandons all messages published and acknowledged in the current transaction.
-
#tx_select ⇒ Symbol
This method sets the channel to use standard transactions.
Methods inherited from Qrack::Client
#close, #connected?, #connecting?, #next_payload, #read, #returned_message, #switch_channel, #write
Constructor Details
#initialize(connection_string_or_opts = Hash.new, opts = Hash.new) ⇒ Client09
Sets up a Bunny::Client object ready for connection to a broker.
Qrack::Client#status is set to :not_connected.
33 34 35 36 37 |
# File 'lib/bunny/client09.rb', line 33 def initialize(connection_string_or_opts = Hash.new, opts = Hash.new) super @spec = '0-9-1' @port = self.__opts__[:port] || (self.__opts__[:ssl] ? Qrack::Protocol09::SSL_PORT : Qrack::Protocol09::PORT) end |
Instance Method Details
#check_response(received_method, expected_method, err_msg, err_class = Bunny::ProtocolError) ⇒ Object
Checks response from AMQP methods and takes appropriate action
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
# File 'lib/bunny/client09.rb', line 40 def check_response(received_method, expected_method, err_msg, err_class = Bunny::ProtocolError) case when received_method.is_a?(Qrack::Protocol09::Connection::Close) # Clean up the socket close_socket raise Bunny::ForcedConnectionCloseError, "Error Reply Code: #{received_method.reply_code}\nError Reply Text: #{received_method.reply_text}" when received_method.is_a?(Qrack::Protocol09::Channel::Close) # Clean up the channel channel.active = false raise Bunny::ForcedChannelCloseError, "Error Reply Code: #{received_method.reply_code}\nError Reply Text: #{received_method.reply_text}" when !received_method.is_a?(expected_method) raise err_class, err_msg else :response_ok end end |
#close_connection ⇒ Object
62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/bunny/client09.rb', line 62 def close_connection # Set client channel to zero switch_channel(0) send_frame(Qrack::Protocol09::Connection::Close.new(:reply_code => 200, :reply_text => 'Goodbye', :class_id => 0, :method_id => 0)) method = next_method check_response(method, Qrack::Protocol09::Connection::CloseOk, "Error closing connection") end |
#create_channel ⇒ Object
74 75 76 77 78 79 80 |
# File 'lib/bunny/client09.rb', line 74 def create_channel channels.each do |c| return c if (!c.open? and c.number != 0) end # If no channel to re-use instantiate new one Bunny::Channel09.new(self) end |
#exchange(name, opts = {}) ⇒ Bunny::Exchange09
Declares an exchange to the broker/server. If the exchange does not exist, a new one is created using the arguments passed in. If the exchange already exists, a reference to it is created, provided that the arguments passed in do not conflict with the existing attributes of the exchange. If an error occurs a Bunny_::_ProtocolError is raised.
107 108 109 |
# File 'lib/bunny/client09.rb', line 107 def exchange(name, opts = {}) exchanges[name] || Bunny::Exchange09.new(self, name, opts) end |
#init_connection ⇒ Object
111 112 113 114 115 116 117 118 119 |
# File 'lib/bunny/client09.rb', line 111 def init_connection write(Qrack::Protocol09::HEADER) write([0, Qrack::Protocol09::VERSION_MAJOR, Qrack::Protocol09::VERSION_MINOR, Qrack::Protocol09::REVISION].pack('C4')) frame = next_frame if frame.nil? or !frame.payload.is_a?(Qrack::Protocol09::Connection::Start) raise Bunny::ProtocolError, 'Connection initiation failed' end end |
#next_frame(opts = {}) ⇒ Object
121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 |
# File 'lib/bunny/client09.rb', line 121 def next_frame(opts = {}) frame = nil case when channel.frame_buffer.size > 0 frame = channel.frame_buffer.shift when (timeout = opts[:timeout]) && timeout > 0 Bunny::Timer::timeout(timeout, Qrack::FrameTimeout) do frame = Qrack::Transport09::Frame.parse(buffer) end else frame = Qrack::Transport09::Frame.parse(buffer) end @logger.info("received") { frame } if @logging raise Bunny::ConnectionError, 'No connection to server' if (frame.nil? and !connecting?) # Monitor server activity and discard heartbeats = true case when frame.is_a?(Qrack::Transport09::Heartbeat) next_frame(opts) when frame.nil? frame when ((frame.channel != channel.number) and (frame.channel != 0)) channel.frame_buffer << frame next_frame(opts) else frame end end |
#open_connection ⇒ Object
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 |
# File 'lib/bunny/client09.rb', line 156 def open_connection client_props = { :platform => 'Ruby', :product => 'Bunny', :information => 'http://github.com/ruby-amqp/bunny', :version => VERSION } start_opts = { :client_properties => client_props, :mechanism => 'PLAIN', :response => "\0" + @user + "\0" + @pass, :locale => 'en_US' } send_frame(Qrack::Protocol09::Connection::StartOk.new(start_opts)) frame = next_frame raise Bunny::ProtocolError, "Connection failed - user: #{@user}" if frame.nil? method = frame.payload if method.is_a?(Qrack::Protocol09::Connection::Tune) send_frame(Qrack::Protocol09::Connection::TuneOk.new(:channel_max => @channel_max, :frame_max => @frame_max, :heartbeat => @heartbeat)) end send_frame(Qrack::Protocol09::Connection::Open.new(:virtual_host => @vhost, :reserved_1 => 0, :reserved_2 => false)) raise Bunny::ProtocolError, 'Cannot open connection' unless next_method.is_a?(Qrack::Protocol09::Connection::OpenOk) end |
#qos(opts = {}) ⇒ Symbol
Requests a specific quality of service. The QoS can be specified for the current channel or for all channels on the connection. The particular properties and semantics of a QoS method always depend on the content class semantics. Though the QoS method could in principle apply to both peers, it is currently meaningful only for the server.
206 207 208 209 210 211 212 213 214 215 |
# File 'lib/bunny/client09.rb', line 206 def qos(opts = {}) send_frame(Qrack::Protocol09::Basic::Qos.new({ :prefetch_size => 0, :prefetch_count => 1, :global => false }.merge(opts))) method = next_method check_response(method, Qrack::Protocol09::Basic::QosOk, "Error specifying Quality of Service") # return confirmation :qos_ok end |
#queue(name = nil, opts = {}) ⇒ Bunny::Queue09
Declares a queue to the broker/server. If the queue does not exist, a new one is created using the arguments passed in. If the queue already exists, a reference to it is created, provided that the arguments passed in do not conflict with the existing attributes of the queue. If an error occurs a ProtocolError is raised.
246 247 248 249 250 251 252 253 254 |
# File 'lib/bunny/client09.rb', line 246 def queue(name = nil, opts = {}) if name.is_a?(Hash) opts = name name = nil end # Queue is responsible for placing itself in the list of queues queues[name] || Bunny::Queue09.new(self, name, opts) end |
#recover(opts = {}) ⇒ Object
Asks the broker to redeliver all unacknowledged messages on a specified channel. Zero or more messages may be redelivered.
263 264 265 |
# File 'lib/bunny/client09.rb', line 263 def recover(opts = {}) send_frame(Qrack::Protocol09::Basic::Recover.new({ :requeue => false }.merge(opts))) end |
#send_frame(*args) ⇒ Object
267 268 269 270 271 272 273 274 275 276 277 278 279 280 |
# File 'lib/bunny/client09.rb', line 267 def send_frame(*args) args.each do |data| data = data.to_frame(channel.number) unless data.is_a?(Qrack::Transport09::Frame) data.channel = channel.number @logger.info("send") { data } if @logging write(data.to_s) # Monitor client activity for heartbeat purposes = true end nil end |
#send_heartbeat ⇒ Object
282 283 284 285 286 287 288 289 |
# File 'lib/bunny/client09.rb', line 282 def send_heartbeat # Create a new heartbeat frame hb = Qrack::Transport09::Heartbeat.new('') # Channel 0 must be used switch_channel(0) if @channel.number > 0 # Send the heartbeat to server send_frame(hb) end |
#start_session ⇒ Symbol Also known as: start
Opens a communication channel and starts a connection. If an error occurs, a
ProtocolError is raised. If successful, Qrack::Client#status is set to :connected.
295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 |
# File 'lib/bunny/client09.rb', line 295 def start_session @connecting = true # Create/get socket socket # Initiate connection init_connection # Open connection open_connection # Open another channel because channel zero is used for specific purposes c = create_channel() c.open @connecting = false # return status @status = :connected end |
#tx_commit ⇒ Symbol
This method commits all messages published and acknowledged in the current transaction. A new transaction starts immediately after a commit.
324 325 326 327 328 329 330 331 332 333 |
# File 'lib/bunny/client09.rb', line 324 def tx_commit send_frame(Qrack::Protocol09::Tx::Commit.new()) method = next_method check_response(method, Qrack::Protocol09::Tx::CommitOk, "Error commiting transaction") # return confirmation :commit_ok end |
#tx_rollback ⇒ Symbol
This method abandons all messages published and acknowledged in the current transaction. A new transaction starts immediately after a rollback.
340 341 342 343 344 345 346 347 348 349 |
# File 'lib/bunny/client09.rb', line 340 def tx_rollback send_frame(Qrack::Protocol09::Tx::Rollback.new()) method = next_method check_response(method, Qrack::Protocol09::Tx::RollbackOk, "Error rolling back transaction") # return confirmation :rollback_ok end |
#tx_select ⇒ Symbol
This method sets the channel to use standard transactions. The client must use this method at least once on a channel before using the Commit or Rollback methods.
356 357 358 359 360 361 362 363 364 365 |
# File 'lib/bunny/client09.rb', line 356 def tx_select send_frame(Qrack::Protocol09::Tx::Select.new()) method = next_method check_response(method, Qrack::Protocol09::Tx::SelectOk, "Error initiating transactions for current channel") # return confirmation :select_ok end |