Class: Qrack::Client
- Inherits:
-
Object
- Object
- Qrack::Client
- Defined in:
- lib/qrack/client.rb
Overview
Client ancestor class
Direct Known Subclasses
Constant Summary collapse
- CONNECT_TIMEOUT =
5.0
- RETRY_DELAY =
10.0
Instance Attribute Summary collapse
-
#__opts__ ⇒ Object
readonly
Temporary hack to make Bunny 0.7 work with port number in AMQP URL.
-
#channel ⇒ Object
Returns the value of attribute channel.
-
#channels ⇒ Object
Returns the value of attribute channels.
-
#connecting ⇒ Object
Returns the value of attribute connecting.
-
#exchanges ⇒ Object
Returns the value of attribute exchanges.
-
#heartbeat ⇒ Object
readonly
Returns the value of attribute heartbeat.
-
#host ⇒ Object
readonly
Returns the value of attribute host.
-
#logfile ⇒ Object
Returns the value of attribute logfile.
-
#logging ⇒ Object
Returns the value of attribute logging.
-
#message_in ⇒ Object
Returns the value of attribute message_in.
-
#message_out ⇒ Object
Returns the value of attribute message_out.
-
#port ⇒ Object
readonly
Returns the value of attribute port.
-
#queues ⇒ Object
Returns the value of attribute queues.
-
#spec ⇒ Object
readonly
Returns the value of attribute spec.
-
#status ⇒ Object
readonly
Returns the value of attribute status.
-
#vhost ⇒ Object
readonly
Returns the value of attribute vhost.
Instance Method Summary collapse
-
#close ⇒ Symbol
(also: #stop)
:not_connected
if successful. - #connected? ⇒ Boolean
- #connecting? ⇒ Boolean
-
#initialize(connection_string_or_opts = Hash.new, opts = Hash.new) ⇒ Client
constructor
A new instance of Client.
- #next_payload(options = {}) ⇒ Object (also: #next_method)
- #read(*args) ⇒ Object
-
#returned_message(opts = {}) ⇒ Hash
{:header => nil, :payload => :no_return, :return_details => nil}
if message is not returned before timeout. - #switch_channel(chann) ⇒ Object
- #write(*args) ⇒ Object
Constructor Details
#initialize(connection_string_or_opts = Hash.new, opts = Hash.new) ⇒ Client
Returns a new instance of Client.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/qrack/client.rb', line 30 def initialize(connection_string_or_opts = Hash.new, opts = Hash.new) opts = case connection_string_or_opts when String then AMQ::Client::Settings.parse_amqp_url(connection_string_or_opts) when Hash then connection_string_or_opts else Hash.new end.merge(opts) # Temporary hack to make Bunny 0.7 work with port number in AMQP URL. # This is not necessary on Bunny 0.8 as it removes support of AMQP 0.8. @__opts__ = opts @host = opts[:host] || 'localhost' @user = opts[:user] || 'guest' @pass = opts[:pass] || 'guest' @vhost = opts[:vhost] || '/' @logfile = opts[:logfile] || nil @logging = opts[:logging] || false @ssl = opts[:ssl] || false @verify_ssl = opts[:verify_ssl].nil? || opts[:verify_ssl] @status = :not_connected @frame_max = opts[:frame_max] || 131072 @channel_max = opts[:channel_max] || 0 @heartbeat = opts[:heartbeat] || 0 @connect_timeout = opts[:connect_timeout] || CONNECT_TIMEOUT @read_write_timeout = opts[:socket_timeout] @read_write_timeout = nil if @read_write_timeout == 0 @disconnect_timeout = @read_write_timeout || @connect_timeout @logger = nil create_logger if @logging @message_in = false @message_out = false @connecting = false @channels ||= [] # Create channel 0 @channel = create_channel() @exchanges ||= {} @queues ||= {} end |
Instance Attribute Details
#__opts__ ⇒ Object (readonly)
Temporary hack to make Bunny 0.7 work with port number in AMQP URL. This is not necessary on Bunny 0.8 as it removes support of AMQP 0.8.
28 29 30 |
# File 'lib/qrack/client.rb', line 28 def __opts__ @__opts__ end |
#channel ⇒ Object
Returns the value of attribute channel.
24 25 26 |
# File 'lib/qrack/client.rb', line 24 def channel @channel end |
#channels ⇒ Object
Returns the value of attribute channels.
24 25 26 |
# File 'lib/qrack/client.rb', line 24 def channels @channels end |
#connecting ⇒ Object
Returns the value of attribute connecting.
24 25 26 |
# File 'lib/qrack/client.rb', line 24 def connecting @connecting end |
#exchanges ⇒ Object
Returns the value of attribute exchanges.
24 25 26 |
# File 'lib/qrack/client.rb', line 24 def exchanges @exchanges end |
#heartbeat ⇒ Object (readonly)
Returns the value of attribute heartbeat.
23 24 25 |
# File 'lib/qrack/client.rb', line 23 def heartbeat @heartbeat end |
#host ⇒ Object (readonly)
Returns the value of attribute host.
23 24 25 |
# File 'lib/qrack/client.rb', line 23 def host @host end |
#logfile ⇒ Object
Returns the value of attribute logfile.
24 25 26 |
# File 'lib/qrack/client.rb', line 24 def logfile @logfile end |
#logging ⇒ Object
Returns the value of attribute logging.
23 24 25 |
# File 'lib/qrack/client.rb', line 23 def logging @logging end |
#message_in ⇒ Object
Returns the value of attribute message_in.
24 25 26 |
# File 'lib/qrack/client.rb', line 24 def @message_in end |
#message_out ⇒ Object
Returns the value of attribute message_out.
24 25 26 |
# File 'lib/qrack/client.rb', line 24 def @message_out end |
#port ⇒ Object (readonly)
Returns the value of attribute port.
23 24 25 |
# File 'lib/qrack/client.rb', line 23 def port @port end |
#queues ⇒ Object
Returns the value of attribute queues.
24 25 26 |
# File 'lib/qrack/client.rb', line 24 def queues @queues end |
#spec ⇒ Object (readonly)
Returns the value of attribute spec.
23 24 25 |
# File 'lib/qrack/client.rb', line 23 def spec @spec end |
#status ⇒ Object (readonly)
Returns the value of attribute status.
23 24 25 |
# File 'lib/qrack/client.rb', line 23 def status @status end |
#vhost ⇒ Object (readonly)
Returns the value of attribute vhost.
23 24 25 |
# File 'lib/qrack/client.rb', line 23 def vhost @vhost end |
Instance Method Details
#close ⇒ Symbol Also known as: stop
Returns :not_connected
if successful.
77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/qrack/client.rb', line 77 def close return if @socket.nil? || @socket.closed? # Close all active channels channels.each do |c| Bunny::Timer::timeout(@disconnect_timeout) { c.close } if c.open? end # Close connection to AMQP server Bunny::Timer::timeout(@disconnect_timeout) { close_connection } rescue Exception # http://cheezburger.com/Asset/View/4033311488 ensure # Clear the channels @channels = [] # Create channel 0 @channel = create_channel() # Close TCP Socket close_socket end |
#connected? ⇒ Boolean
103 104 105 |
# File 'lib/qrack/client.rb', line 103 def connected? status == :connected end |
#connecting? ⇒ Boolean
107 108 109 |
# File 'lib/qrack/client.rb', line 107 def connecting? connecting end |
#next_payload(options = {}) ⇒ Object Also known as: next_method
116 117 118 119 |
# File 'lib/qrack/client.rb', line 116 def next_payload( = {}) res = next_frame() res.payload if res end |
#read(*args) ⇒ Object
123 124 125 126 127 128 |
# File 'lib/qrack/client.rb', line 123 def read(*args) send_command(:read, *args) # Got a SIGINT while waiting; give any traps a chance to run rescue Errno::EINTR retry end |
#returned_message(opts = {}) ⇒ Hash
Returns {:header => nil, :payload => :no_return, :return_details => nil}
if message is not returned before timeout. {:header, :return_details, :payload}
if message is returned. :return_details
is a hash {:reply_code, :reply_text, :exchange, :routing_key}
.
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/qrack/client.rb', line 136 def (opts = {}) begin frame = next_frame(:timeout => opts[:timeout] || 0.1) rescue Qrack::FrameTimeout return {:header => nil, :payload => :no_return, :return_details => nil} end method = frame.payload header = next_payload # If maximum frame size is smaller than message payload body then message # will have a message header and several message bodies msg = '' while msg.length < header.size msg << next_payload end # Return the message and related info {:header => header, :payload => msg, :return_details => method.arguments} end |
#switch_channel(chann) ⇒ Object
158 159 160 161 162 163 164 165 |
# File 'lib/qrack/client.rb', line 158 def switch_channel(chann) if (0...channels.size).include? chann @channel = channels[chann] chann else raise RuntimeError, "Invalid channel number - #{chann}" end end |
#write(*args) ⇒ Object
167 168 169 |
# File 'lib/qrack/client.rb', line 167 def write(*args) send_command(:write, *args) end |