Class: Beanpool::Connections

Inherits:
Object
  • Object
show all
Defined in:
lib/beanpool/connections.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ip_array, debug) ⇒ Connections

Returns a new instance of Connections.



7
8
9
10
11
12
13
# File 'lib/beanpool/connections.rb', line 7

def initialize(ip_array, debug)
  @ip_array = ip_array
  @troubled_ips = {}
  @connections = {}
  @debug = debug
  build_connections
end

Instance Attribute Details

#connectionsObject (readonly)

Returns the value of attribute connections.



5
6
7
# File 'lib/beanpool/connections.rb', line 5

def connections
  @connections
end

#ip_arrayObject (readonly)

Returns the value of attribute ip_array.



5
6
7
# File 'lib/beanpool/connections.rb', line 5

def ip_array
  @ip_array
end

Instance Method Details

#build_connectionsObject



32
33
34
35
36
37
38
39
40
41
42
# File 'lib/beanpool/connections.rb', line 32

def build_connections
  @ip_array.each do |ip|
    raise 'Only single IP for beaneater' if ip.is_a?(Array)
    begin
      @connections[ip] = Beaneater.new(ip)
    rescue => ex
      notify(ex)
      notify("Failed to add #{ip}")
    end
  end
end

#check_timesObject



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/beanpool/connections.rb', line 63

def check_times
  @troubled_ips.each do |k, v|
    notify("Checking troubled #{k}")
    if v[:time] < Time.now - 60
      begin
        @connections[k] = Beaneater.new(k)
        notify("Re-added to live: #{k}")
        @troubled_ips.delete(k)
      rescue => ex
        notify(ex)
        # Keep retrying every minute
        v[:time] = Time.now
      end
    end
  end
end

#closeObject



44
45
46
47
48
# File 'lib/beanpool/connections.rb', line 44

def close
  @connections.each do |_k, v|
    v.close
  end
end

#connection_sampleObject



57
58
59
60
61
# File 'lib/beanpool/connections.rb', line 57

def connection_sample
  check_times
  ip_id = @connections.keys.sample
  ip_id
end

#get_job_from_tube(timeout = nil, tube_name = 'default') ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/beanpool/connections.rb', line 80

def get_job_from_tube(timeout = nil, tube_name = 'default')
  ip_id = connection_sample
  connection = @connections[ip_id]
  begin
    job = connection.tubes[tube_name].reserve(timeout)
    return job
  rescue Beaneater::TimedOutError
    return nil
  rescue => ex
    notify(ex)
    notify("Exception IP: #{ip_id}")
    put_ip_in_timeout_and_reload(ip_id)
    return nil
  end
end

#keystring_hash(hash) ⇒ Object



120
121
122
123
124
125
126
# File 'lib/beanpool/connections.rb', line 120

def keystring_hash(hash)
  new_hash = {}
  hash.keys.each do |k|
    new_hash[k.to_s] = hash[k]
  end
  new_hash
end

#notify(object) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/beanpool/connections.rb', line 15

def notify(object)
  if @debug
    if object.is_a? Exception
      backtrace_array = object.backtrace
      backtrace_array.reject! { |x| x =~ /\.rvm/ }
      backtrace_array.unshift(object.message.to_s)
      raw_string = backtrace_array.join("\n")
      puts "EXCEPTION: #{object.message}"
      puts raw_string
    elsif object.is_a?(Hash) || object.is_a?(Array)
      puts object
    elsif object.is_a?(String)
      puts object
    end
  end
end

#put_ip_in_timeout_and_reload(ip) ⇒ Object



50
51
52
53
54
55
# File 'lib/beanpool/connections.rb', line 50

def put_ip_in_timeout_and_reload(ip)
  return unless @connections.size > 1
  @troubled_ips[ip] = { time: Time.now }
  @connections.delete(ip) unless @connections.size < 2
  notify("Added #{ip} to troubled")
end

#put_job_to_tube(body, options) ⇒ Object



96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/beanpool/connections.rb', line 96

def put_job_to_tube(body, options)
  options = keystring_hash(options)
  pri = options["pri"] || 32000
  ttr = options["ttr"] || 60
  reset_use_tube = options['reset_use_tube']
  tube_name = options["tube_name"] || 'default'
  delay = (options["delay"]).to_i

  ip_id = connection_sample
  connection = @connections[ip_id]
  begin
    notify("BEANPOOL: Putting to #{tube_name}")
    connection.tubes[tube_name].put(body, :pri => pri, :delay => delay, :ttr => ttr)
  rescue => ex
    notify(ex)
    put_ip_in_timeout_and_reload(ip_id)
    ip_id = connection_sample
    connection = @connections[ip_id]
    connection.tubes[tube_name].put(body, :pri => pri, :delay => delay, :ttr => ttr)
  end
  # Force default tube reset if requested.
  connection.tubes.use(reset_use_tube) if connection && reset_use_tube
end

#stats(tube_name, stat_name) ⇒ Object



128
129
130
131
132
133
134
135
# File 'lib/beanpool/connections.rb', line 128

def stats(tube_name, stat_name)
  value = 0
  @connections.each do |_k, v|
    beaneater_stats = v.tubes[tube_name].stats
    value += beaneater_stats[stat_name.to_sym].to_i
  end
  value
end