Class: Qrack::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/qrack/client.rb

Overview

Client ancestor class

Direct Known Subclasses

Bunny::Client, Bunny::Client09

Constant Summary collapse

CONNECT_TIMEOUT =
5.0
RETRY_DELAY =
10.0

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection_string_or_opts = Hash.new, opts = Hash.new) ⇒ Client

Returns a new instance of Client.



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/qrack/client.rb', line 30

def initialize(connection_string_or_opts = Hash.new, opts = Hash.new)
  opts = case connection_string_or_opts
  when String then
    AMQ::Client::Settings.parse_amqp_url(connection_string_or_opts)
  when Hash then
    connection_string_or_opts
  else
    Hash.new
  end.merge(opts)

  # Temporary hack to make Bunny 0.7 work with port number in AMQP URL.
  # This is not necessary on Bunny 0.8 as it removes support of AMQP 0.8.
  @__opts__ = opts


  @host   = opts[:host] || 'localhost'
  @user   = opts[:user]  || 'guest'
  @pass   = opts[:pass]  || 'guest'
  @vhost  = opts[:vhost] || '/'
  @logfile = opts[:logfile] || nil
  @logging = opts[:logging] || false
  @ssl = opts[:ssl] || false
  @verify_ssl = opts[:verify_ssl].nil? || opts[:verify_ssl]
  @status = :not_connected
  @frame_max = opts[:frame_max] || 131072
  @channel_max = opts[:channel_max] || 0
  @heartbeat = opts[:heartbeat] || 0
  @connect_timeout = opts[:connect_timeout] || CONNECT_TIMEOUT
  @read_write_timeout = opts[:socket_timeout]
  @read_write_timeout = nil if @read_write_timeout == 0
  @disconnect_timeout = @read_write_timeout || @connect_timeout
  @logger = nil
  create_logger if @logging
  @message_in = false
  @message_out = false
  @connecting = false
  @channels ||= []
  # Create channel 0
  @channel = create_channel()
  @exchanges ||= {}
  @queues ||= {}
end

Instance Attribute Details

#__opts__Object (readonly)

Temporary hack to make Bunny 0.7 work with port number in AMQP URL. This is not necessary on Bunny 0.8 as it removes support of AMQP 0.8.



28
29
30
# File 'lib/qrack/client.rb', line 28

def __opts__
  @__opts__
end

#channelObject

Returns the value of attribute channel.



24
25
26
# File 'lib/qrack/client.rb', line 24

def channel
  @channel
end

#channelsObject

Returns the value of attribute channels.



24
25
26
# File 'lib/qrack/client.rb', line 24

def channels
  @channels
end

#connectingObject

Returns the value of attribute connecting.



24
25
26
# File 'lib/qrack/client.rb', line 24

def connecting
  @connecting
end

#exchangesObject

Returns the value of attribute exchanges.



24
25
26
# File 'lib/qrack/client.rb', line 24

def exchanges
  @exchanges
end

#heartbeatObject (readonly)

Returns the value of attribute heartbeat.



23
24
25
# File 'lib/qrack/client.rb', line 23

def heartbeat
  @heartbeat
end

#hostObject (readonly)

Returns the value of attribute host.



23
24
25
# File 'lib/qrack/client.rb', line 23

def host
  @host
end

#logfileObject

Returns the value of attribute logfile.



24
25
26
# File 'lib/qrack/client.rb', line 24

def logfile
  @logfile
end

#loggingObject

Returns the value of attribute logging.



23
24
25
# File 'lib/qrack/client.rb', line 23

def logging
  @logging
end

#message_inObject

Returns the value of attribute message_in.



24
25
26
# File 'lib/qrack/client.rb', line 24

def message_in
  @message_in
end

#message_outObject

Returns the value of attribute message_out.



24
25
26
# File 'lib/qrack/client.rb', line 24

def message_out
  @message_out
end

#portObject (readonly)

Returns the value of attribute port.



23
24
25
# File 'lib/qrack/client.rb', line 23

def port
  @port
end

#queuesObject

Returns the value of attribute queues.



24
25
26
# File 'lib/qrack/client.rb', line 24

def queues
  @queues
end

#specObject (readonly)

Returns the value of attribute spec.



23
24
25
# File 'lib/qrack/client.rb', line 23

def spec
  @spec
end

#statusObject (readonly)

Returns the value of attribute status.



23
24
25
# File 'lib/qrack/client.rb', line 23

def status
  @status
end

#vhostObject (readonly)

Returns the value of attribute vhost.



23
24
25
# File 'lib/qrack/client.rb', line 23

def vhost
  @vhost
end

Instance Method Details

#closeSymbol Also known as: stop

Returns :not_connected if successful.

Returns:

  • (Symbol)

    :not_connected if successful.



77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/qrack/client.rb', line 77

def close
  return if @socket.nil? || @socket.closed?

  # Close all active channels
  channels.each do |c|
    Bunny::Timer::timeout(@disconnect_timeout) { c.close } if c.open?
  end

  # Close connection to AMQP server
  Bunny::Timer::timeout(@disconnect_timeout) { close_connection }

rescue Exception
  # http://cheezburger.com/Asset/View/4033311488
ensure
  # Clear the channels
  @channels = []

  # Create channel 0
  @channel = create_channel()

  # Close TCP Socket
  close_socket
end

#connected?Boolean

Returns:

  • (Boolean)


103
104
105
# File 'lib/qrack/client.rb', line 103

def connected?
  status == :connected
end

#connecting?Boolean

Returns:

  • (Boolean)


107
108
109
# File 'lib/qrack/client.rb', line 107

def connecting?
  connecting
end

#next_payload(options = {}) ⇒ Object Also known as: next_method



116
117
118
119
# File 'lib/qrack/client.rb', line 116

def next_payload(options = {})
  res = next_frame(options)
  res.payload if res
end

#read(*args) ⇒ Object



123
124
125
126
127
128
# File 'lib/qrack/client.rb', line 123

def read(*args)
  send_command(:read, *args)
  # Got a SIGINT while waiting; give any traps a chance to run
rescue Errno::EINTR
  retry
end

#returned_message(opts = {}) ⇒ Hash

Returns {:header => nil, :payload => :no_return, :return_details => nil} if message is not returned before timeout. {:header, :return_details, :payload} if message is returned. :return_details is a hash {:reply_code, :reply_text, :exchange, :routing_key}.

Parameters:

  • opts (Hash) (defaults to: {})

    Options.

Options Hash (opts):

  • :timeout (Numeric) — default: 0.1

    The method will wait for a return message until this timeout interval is reached.

Returns:

  • (Hash)

    {:header => nil, :payload => :no_return, :return_details => nil} if message is not returned before timeout. {:header, :return_details, :payload} if message is returned. :return_details is a hash {:reply_code, :reply_text, :exchange, :routing_key}.



136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/qrack/client.rb', line 136

def returned_message(opts = {})

  begin
    frame = next_frame(:timeout => opts[:timeout] || 0.1)
  rescue Qrack::FrameTimeout
    return {:header => nil, :payload => :no_return, :return_details => nil}
  end

  method = frame.payload
  header = next_payload

  # If maximum frame size is smaller than message payload body then message
  # will have a message header and several message bodies
  msg = ''
  while msg.length < header.size
    msg << next_payload
  end

  # Return the message and related info
  {:header => header, :payload => msg, :return_details => method.arguments}
end

#switch_channel(chann) ⇒ Object



158
159
160
161
162
163
164
165
# File 'lib/qrack/client.rb', line 158

def switch_channel(chann)
  if (0...channels.size).include? chann
    @channel = channels[chann]
    chann
  else
    raise RuntimeError, "Invalid channel number - #{chann}"
  end
end

#write(*args) ⇒ Object



167
168
169
# File 'lib/qrack/client.rb', line 167

def write(*args)
  send_command(:write, *args)
end