Class: Backburner::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/backburner/connection.rb

Defined Under Namespace

Classes: BadURL

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, &on_reconnect) ⇒ Connection

Constructs a backburner connection ‘url` can be a string i.e ’127.0.0.1:3001’ or an array of addresses (however, only the first element in the array will be used)



19
20
21
22
23
24
# File 'lib/backburner/connection.rb', line 19

def initialize(url, &on_reconnect)
  @url = url
  @allq_wrapper = nil
  @on_reconnect = on_reconnect
  connect!
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(m, *args, &block) ⇒ Object

Attempt to ensure we’re connected to allq if the missing method is present in the delegate and we haven’t shut down the connection on purpose

Raises:

  • (Beaneater::NotConnected)

    If allq fails to connect after multiple attempts.



93
94
95
96
# File 'lib/backburner/connection.rb', line 93

def method_missing(m, *args, &block)
  ensure_connected! if respond_to_missing?(m, false)
  super
end

Instance Attribute Details

#allq_wrapperObject

Returns the value of attribute allq_wrapper.



7
8
9
# File 'lib/backburner/connection.rb', line 7

def allq_wrapper
  @allq_wrapper
end

#on_reconnectObject

If a proc is provided, it will be called (and given this connection as an argument) whenever the connection is reconnected.

Examples:

connection.on_reconnect = lambda { |conn| puts 'reconnected!' }


13
14
15
# File 'lib/backburner/connection.rb', line 13

def on_reconnect
  @on_reconnect
end

#urlObject

Returns the value of attribute url.



7
8
9
# File 'lib/backburner/connection.rb', line 7

def url
  @url
end

Instance Method Details

#allq_addressesObject

Returns the allq queue addresses

Examples:

allq_addresses => ["127.0.0.1:11300"]


162
163
164
165
# File 'lib/backburner/connection.rb', line 162

def allq_addresses
  uri = self.url.is_a?(Array) ? self.url.first : self.url
  allq_host_and_port(uri)
end

#allq_host_and_port(uri_string) ⇒ Object

Returns a host and port based on the uri_string given

Examples:

allq_host_and_port("allq://127.0.0.1") => "127.0.0.1:11300"

Raises:



172
173
174
175
176
# File 'lib/backburner/connection.rb', line 172

def allq_host_and_port(uri_string)
  uri = URI.parse(uri_string)
  raise(BadURL, uri_string) if uri.scheme != 'allq'.freeze
  "#{uri.host}:#{uri.port || 11300}"
end

#clear(tube) ⇒ Object



26
27
28
# File 'lib/backburner/connection.rb', line 26

def clear(tube)
  @allq_wrapper.clear(tube)
end

#closeObject

Close the connection, if it exists



31
32
33
34
# File 'lib/backburner/connection.rb', line 31

def close
  @allq_wrapper.close if @allq_wrapper
  @allq_wrapper = nil
end

#connect!Object

Connects to a allq queue

Raises:

  • Beaneater::NotConnected if the connection cannot be established



100
101
102
103
# File 'lib/backburner/connection.rb', line 100

def connect!
  @allq_wrapper = Backburner::AllQWrapper.new(allq_addresses)
  @allq_wrapper
end

#connected?Boolean

Determines if the connection to allq is currently open

Returns:

  • (Boolean)


37
38
39
40
41
42
43
# File 'lib/backburner/connection.rb', line 37

def connected?
  begin
    !!(@allq_wrapper && @allq_wrapper.connection && @allq_wrapper.connection.connection && !@allq_wrapper.connection.connection.closed?) # Would be nice if beaneater provided a connected? method
  rescue
    false
  end
end

#ensure_connected!(max_retries = 4, retry_delay = 1.0) ⇒ Object

Attempts to ensure a connection to allq is established but only if we’re not connected already

Parameters:

  • max_retries (defaults to: 4)

    Integer The maximum number of times to attempt connecting. Defaults to 4

  • retry_delay (defaults to: 1.0)

    Float The time to wait between retrying to connect. Defaults to 1.0

Returns:

  • Connection This Connection is returned if the connection to allq is open or was able to be reconnected

Raises:

  • (Beaneater::NotConnected)

    If allq fails to connect after multiple attempts.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/backburner/connection.rb', line 139

def ensure_connected!(max_retries = 4, retry_delay = 1.0)
  return self if connected?

  begin
    reconnect!
    return self

  rescue Exception => e
    if max_retries > 0
      max_retries -= 1
      sleep retry_delay
      retry
    else # stop retrying
      raise e
    end
  end
end

#get(tube_name) ⇒ Object



129
130
131
# File 'lib/backburner/connection.rb', line 129

def get(tube_name)
  @allq_wrapper.get(tube_name)
end

#put(tube_name, data, opt) ⇒ Object



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/backburner/connection.rb', line 105

def put(tube_name, data, opt)
  pri = (opt[:pri] || 5).to_i
  ttr = (opt[:ttr] || 600).to_i

  options = {
    tube_name: tube_name,
    pri: pri,
    delay: opt[:delay].to_i,
    ttr: ttr
  }

  options[:shard_key] = opt[:shard_key] if opt[:shard_key]
  options[:parent_id] = opt[:parent_id] if opt[:parent_id]
  options[:timeout] = opt[:timeout] if opt[:timeout]
  options[:run_on_timeout] = opt[:run_on_timeout] if opt[:run_on_timeout]
  options[:limit] = opt[:limit] if opt[:limit]
  options[:is_parent] = opt[:is_parent] if opt[:is_parent]

puts "OPTIONS2 #{options}"
  # Overwrite originals
  opt.merge!(options)
  @allq_wrapper.put(data, options)
end

#reconnect!Object

Attempt to reconnect to allq. Note: the connection will not be watching or using the tubes it was before it was reconnected (as it’s actually a completely new connection)

Raises:

  • (Beaneater::NotConnected)

    If allq fails to connect



53
54
55
56
57
# File 'lib/backburner/connection.rb', line 53

def reconnect!
  close
  connect!
  @on_reconnect.call(self) if @on_reconnect.respond_to?(:call)
end

#retryable(options = {}, &block) ⇒ Object

Yield to a block that will be retried several times if the connection to allq goes down and is able to be re-established.

Parameters:

  • options (defaults to: {})

    Hash Options. Valid options are: :max_retries Integer The maximum number of times the block will be yielded to.

    Defaults to 4
    

    :on_retry Proc An optional proc that will be called for each retry. Will be

    called after the connection is re-established and :retry_delay
    has passed but before the block is yielded to again
    

    :retry_delay Float The amount to sleep before retrying. Defaults to 1.0

Raises:

  • Beaneater::NotConnected If a connection is unable to be re-established



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/backburner/connection.rb', line 70

def retryable(options = {}, &block)
  options = {:max_retries => 4, :on_retry => nil, :retry_delay => 1.0}.merge!(options)
  retry_count = options[:max_retries]

  begin
    yield

  rescue Exception => e
    if retry_count > 0
      reconnect!
      retry_count -= 1
      sleep options[:retry_delay]
      options[:on_retry].call if options[:on_retry].respond_to?(:call)
      retry
    else # stop retrying
      raise e
    end
  end
end

#tubesObject



45
46
47
# File 'lib/backburner/connection.rb', line 45

def tubes
  @allq_wrapper.tube_names if @allq_wrapper
end