Class: Beaneater::Pool

Inherits:
Object
  • Object
show all
Defined in:
lib/beaneater/pool.rb

Overview

Represents collection of beanstalkd connections.

Constant Summary collapse

MAX_RETRIES =

Default number of retries to send a command to a connection

3

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(addresses = nil) ⇒ Pool

Initialize new connection

Examples:

Beaneater::Pool.new(['localhost:11300', '127.0.0.1:11300'])

ENV['BEANSTALKD_URL'] = 'localhost:11300,127.0.0.1:11300'
@bp = Beaneater::Pool.new
@bp.connections.first.host # => 'localhost'
@bp.connections.last.host # => '127.0.0.1'

Parameters:

  • addresses (Array<String>) (defaults to: nil)

    Array of beanstalkd server addresses


23
24
25
26
# File 'lib/beaneater/pool.rb', line 23

def initialize(addresses=nil)
  addresses = addresses || host_from_env || Beaneater.configuration.beanstalkd_url
  @connections = Array(addresses).map { |a| Connection.new(a) }
end

Instance Attribute Details

#connectionsObject

Returns the value of attribute connections


10
11
12
# File 'lib/beaneater/pool.rb', line 10

def connections
  @connections
end

Instance Method Details

#closeObject

Closes all connections within the pool.

Examples:

@pool.close

120
121
122
123
124
125
# File 'lib/beaneater/pool.rb', line 120

def close
  while @connections.any?
    conn = @connections.pop
    conn.close
  end
end

#host_from_envArray (protected)

The hosts provided by BEANSTALKD_URL environment variable, if available.

Examples:

ENV['BEANSTALKD_URL'] = "localhost:1212,localhost:2424"
 # => ['localhost:1212', 'localhost:2424']

Returns:

  • (Array)

    Set of beanstalkd host addresses


161
162
163
# File 'lib/beaneater/pool.rb', line 161

def host_from_env
  ENV['BEANSTALKD_URL'].respond_to?(:length) && ENV['BEANSTALKD_URL'].length > 0 && ENV['BEANSTALKD_URL'].split(',').map(&:strip)
end

#jobsBeaneater::Jobs

Returns Beaneater::Jobs object for accessing job related functions.

Returns:


40
41
42
# File 'lib/beaneater/pool.rb', line 40

def jobs
  @jobs ||= Jobs.new(self)
end

#safe_transmit(&block) ⇒ Object (protected)

Transmit command to beanstalk connections safely handling failed connections

Examples:

safe_transmit { conn.transmit('foo') }
 # => "result of foo command from beanstalk"

Parameters:

  • block (Proc)

    The command to execute.

Returns:

  • (Object)

    Result of the block passed

Raises:


138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/beaneater/pool.rb', line 138

def safe_transmit(&block)
  retries = 1
  begin
    yield
  rescue DrainingError, EOFError, Errno::ECONNRESET, Errno::EPIPE => ex
    # TODO remove faulty connections from pool?
    # https://github.com/kr/beanstalk-client-ruby/blob/master/lib/beanstalk-client/connection.rb#L405-410
    if retries < MAX_RETRIES
      retries += 1
      retry
    else # finished retrying, fail out
      ex.is_a?(DrainingError) ? raise(ex) : raise(NotConnected, "Could not connect!")
    end
  end
end

#statsBeaneater::Stats

Returns Beaneater::Stats object for accessing beanstalk stats.

Returns:


32
33
34
# File 'lib/beaneater/pool.rb', line 32

def stats
  @stats ||= Stats.new(self)
end

#transmit_to_all(command, options = {}, &block) ⇒ Array<Hash{String => String, Number}>

Sends command to every beanstalkd server set in the pool.

Examples:

@pool.transmit_to_all("stats")

Parameters:

  • command (String)

    Beanstalkd command

  • options (Hash{String => String, Boolean}) (defaults to: {})

    socket connections options

  • block (Proc)

    Block passed to socket connection during transmit

Returns:

  • (Array<Hash{String => String, Number}>)

    Beanstalkd command response from each instance


61
62
63
64
65
66
67
68
69
70
71
72
73
# File 'lib/beaneater/pool.rb', line 61

def transmit_to_all(command, options={}, &block)
  res_exception = nil
  res = connections.map { |conn|
    begin
      safe_transmit { conn.transmit(command, options, &block) }
    rescue UnexpectedResponse => ex # not the correct status
      res_exception = ex
      nil
    end
  }.compact
  raise res_exception if res.none? && res_exception
  res
end

#transmit_to_rand(command, options = {}, &block) ⇒ Array<Hash{String => String, Number}>

Sends command to a random beanstalkd server in the pool.

Examples:

@pool.transmit_to_rand("stats", :match => /\n/)

Parameters:

  • command (String)

    Beanstalkd command

  • options (Hash{String => String,Boolean}) (defaults to: {})

    socket connections options

  • block (Proc)

    Block passed in socket connection object

Returns:

  • (Array<Hash{String => String, Number}>)

    Beanstalkd command response from the instance


108
109
110
111
112
113
# File 'lib/beaneater/pool.rb', line 108

def transmit_to_rand(command, options={}, &block)
  safe_transmit do
    conn = connections.respond_to?(:sample) ? connections.sample : connections.choice
    conn.transmit(command, options, &block)
  end
end

#transmit_until_res(command, options = {}, &block) ⇒ Array<Hash{String => String, Number}>

Send command to each beanstalkd servers until getting response expected

Examples:

@pool.transmit_until_res('peek-ready', :status => "FOUND", &block)

Parameters:

  • command (String)

    Beanstalkd command

  • options (Hash{String => String, Boolean}) (defaults to: {})

    socket connections options

  • block (Proc)

    Block passed in socket connection object

Returns:

  • (Array<Hash{String => String, Number}>)

    Beanstalkd command response from the instance


84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/beaneater/pool.rb', line 84

def transmit_until_res(command, options={}, &block)
  status_expected  = options.delete(:status)
  res_exception = nil
  connections.each do |conn|
    begin
      res = safe_transmit { conn.transmit(command, options, &block) }
      return res if res[:status] == status_expected
    rescue UnexpectedResponse => ex # not the correct status
      res_exception = ex
      next
    end
  end
  raise res_exception if res_exception
end

#tubesBeaneater::Tubes

Returns Beaneater::Tubes object for accessing tube related functions.

Returns:


48
49
50
# File 'lib/beaneater/pool.rb', line 48

def tubes
  @tubes ||= Tubes.new(self)
end