Class: Backburner::Connection
- Inherits:
-
Object
- Object
- Backburner::Connection
- Defined in:
- lib/backburner/connection.rb
Defined Under Namespace
Classes: BadURL
Instance Attribute Summary collapse
-
#allq_wrapper ⇒ Object
Returns the value of attribute allq_wrapper.
-
#on_reconnect ⇒ Object
If a proc is provided, it will be called (and given this connection as an argument) whenever the connection is reconnected.
-
#url ⇒ Object
Returns the value of attribute url.
Instance Method Summary collapse
-
#allq_addresses ⇒ Object
Returns the allq queue addresses.
-
#allq_host_and_port(uri_string) ⇒ Object
Returns a host and port based on the uri_string given.
- #clear(tube) ⇒ Object
-
#close ⇒ Object
Close the connection, if it exists.
-
#connect! ⇒ Object
Connects to a allq queue.
-
#connected? ⇒ Boolean
Determines if the connection to allq is currently open.
-
#ensure_connected!(max_retries = 4, retry_delay = 1.0) ⇒ Object
Attempts to ensure a connection to allq is established but only if we’re not connected already.
- #get(tube_name) ⇒ Object
-
#initialize(url, &on_reconnect) ⇒ Connection
constructor
Constructs a backburner connection ‘url` can be a string i.e ’127.0.0.1:3001’ or an array of addresses (however, only the first element in the array will be used).
-
#method_missing(m, *args, &block) ⇒ Object
Attempt to ensure we’re connected to allq if the missing method is present in the delegate and we haven’t shut down the connection on purpose.
- #put(tube_name, data, opt) ⇒ Object
-
#reconnect! ⇒ Object
Attempt to reconnect to allq.
-
#retryable(options = {}, &block) ⇒ Object
Yield to a block that will be retried several times if the connection to allq goes down and is able to be re-established.
- #tubes ⇒ Object
Constructor Details
#initialize(url, &on_reconnect) ⇒ Connection
Constructs a backburner connection ‘url` can be a string i.e ’127.0.0.1:3001’ or an array of addresses (however, only the first element in the array will be used)
19 20 21 22 23 24 |
# File 'lib/backburner/connection.rb', line 19 def initialize(url, &on_reconnect) @url = url @allq_wrapper = nil @on_reconnect = on_reconnect connect! end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(m, *args, &block) ⇒ Object
Attempt to ensure we’re connected to allq if the missing method is present in the delegate and we haven’t shut down the connection on purpose
93 94 95 96 |
# File 'lib/backburner/connection.rb', line 93 def method_missing(m, *args, &block) ensure_connected! if respond_to_missing?(m, false) super end |
Instance Attribute Details
#allq_wrapper ⇒ Object
Returns the value of attribute allq_wrapper.
7 8 9 |
# File 'lib/backburner/connection.rb', line 7 def allq_wrapper @allq_wrapper end |
#on_reconnect ⇒ Object
If a proc is provided, it will be called (and given this connection as an argument) whenever the connection is reconnected.
13 14 15 |
# File 'lib/backburner/connection.rb', line 13 def on_reconnect @on_reconnect end |
#url ⇒ Object
Returns the value of attribute url.
7 8 9 |
# File 'lib/backburner/connection.rb', line 7 def url @url end |
Instance Method Details
#allq_addresses ⇒ Object
Returns the allq queue addresses
162 163 164 165 |
# File 'lib/backburner/connection.rb', line 162 def allq_addresses uri = self.url.is_a?(Array) ? self.url.first : self.url allq_host_and_port(uri) end |
#allq_host_and_port(uri_string) ⇒ Object
Returns a host and port based on the uri_string given
172 173 174 175 176 |
# File 'lib/backburner/connection.rb', line 172 def allq_host_and_port(uri_string) uri = URI.parse(uri_string) raise(BadURL, uri_string) if uri.scheme != 'allq'.freeze "#{uri.host}:#{uri.port || 11300}" end |
#clear(tube) ⇒ Object
26 27 28 |
# File 'lib/backburner/connection.rb', line 26 def clear(tube) @allq_wrapper.clear(tube) end |
#close ⇒ Object
Close the connection, if it exists
31 32 33 34 |
# File 'lib/backburner/connection.rb', line 31 def close @allq_wrapper.close if @allq_wrapper @allq_wrapper = nil end |
#connect! ⇒ Object
Connects to a allq queue
100 101 102 103 |
# File 'lib/backburner/connection.rb', line 100 def connect! @allq_wrapper = Backburner::AllQWrapper.new(allq_addresses) @allq_wrapper end |
#connected? ⇒ Boolean
Determines if the connection to allq is currently open
37 38 39 40 41 42 43 |
# File 'lib/backburner/connection.rb', line 37 def connected? begin !!(@allq_wrapper && @allq_wrapper.connection && @allq_wrapper.connection.connection && !@allq_wrapper.connection.connection.closed?) # Would be nice if beaneater provided a connected? method rescue false end end |
#ensure_connected!(max_retries = 4, retry_delay = 1.0) ⇒ Object
Attempts to ensure a connection to allq is established but only if we’re not connected already
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
# File 'lib/backburner/connection.rb', line 139 def ensure_connected!(max_retries = 4, retry_delay = 1.0) return self if connected? begin reconnect! return self rescue Exception => e if max_retries > 0 max_retries -= 1 sleep retry_delay retry else # stop retrying raise e end end end |
#get(tube_name) ⇒ Object
129 130 131 |
# File 'lib/backburner/connection.rb', line 129 def get(tube_name) @allq_wrapper.get(tube_name) end |
#put(tube_name, data, opt) ⇒ Object
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/backburner/connection.rb', line 105 def put(tube_name, data, opt) pri = (opt[:pri] || 5).to_i ttr = (opt[:ttr] || 600).to_i = { tube_name: tube_name, pri: pri, delay: opt[:delay].to_i, ttr: ttr } [:shard_key] = opt[:shard_key] if opt[:shard_key] [:parent_id] = opt[:parent_id] if opt[:parent_id] [:timeout] = opt[:timeout] if opt[:timeout] [:run_on_timeout] = opt[:run_on_timeout] if opt[:run_on_timeout] [:limit] = opt[:limit] if opt[:limit] [:is_parent] = opt[:is_parent] if opt[:is_parent] puts "OPTIONS2 #{}" # Overwrite originals opt.merge!() @allq_wrapper.put(data, ) end |
#reconnect! ⇒ Object
Attempt to reconnect to allq. Note: the connection will not be watching or using the tubes it was before it was reconnected (as it’s actually a completely new connection)
53 54 55 56 57 |
# File 'lib/backburner/connection.rb', line 53 def reconnect! close connect! @on_reconnect.call(self) if @on_reconnect.respond_to?(:call) end |
#retryable(options = {}, &block) ⇒ Object
Yield to a block that will be retried several times if the connection to allq goes down and is able to be re-established.
70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 |
# File 'lib/backburner/connection.rb', line 70 def retryable( = {}, &block) = {:max_retries => 4, :on_retry => nil, :retry_delay => 1.0}.merge!() retry_count = [:max_retries] begin yield rescue Exception => e if retry_count > 0 reconnect! retry_count -= 1 sleep [:retry_delay] [:on_retry].call if [:on_retry].respond_to?(:call) retry else # stop retrying raise e end end end |
#tubes ⇒ Object
45 46 47 |
# File 'lib/backburner/connection.rb', line 45 def tubes @allq_wrapper.tube_names if @allq_wrapper end |