Class: BackgrounDRb::ClusterConnection
- Inherits:
-
Object
- Object
- BackgrounDRb::ClusterConnection
- Includes:
- ClientHelper
- Defined in:
- lib/backgroundrb/bdrb_cluster_connection.rb
Instance Attribute Summary collapse
-
#backend_connections ⇒ Object
Returns the value of attribute backend_connections.
-
#bdrb_servers ⇒ Object
Returns the value of attribute bdrb_servers.
-
#cache ⇒ Object
Returns the value of attribute cache.
-
#config ⇒ Object
Returns the value of attribute config.
-
#disconnected_connections ⇒ Object
Returns the value of attribute disconnected_connections.
Instance Method Summary collapse
-
#all_worker_info ⇒ Object
Send worker information of all currently running workers from all configured bdrb servers.
-
#choose_server ⇒ Object
choose a server in round robin manner.
-
#discover_server_periodically ⇒ Object
every 10 request or 10 seconds it will try to reconnect to bdrb servers which were down.
-
#establish_connections ⇒ Object
initialize all backend server connections.
-
#find_connection(host_info) ⇒ Object
Fina a connection by host name and port.
-
#find_local ⇒ Object
find the local configured connection.
-
#find_next_except_these(connections) ⇒ Object
Find live connections except those mentioned in array, because they are already dead.
-
#initialize ⇒ ClusterConnection
constructor
initialize cluster connection.
-
#initialize_memcache ⇒ Object
initialize memache if client is storing results in memcache.
-
#new_worker(options = {}) ⇒ Object
one of the backend connections are chosen and worker is started on it.
-
#time_to_discover? ⇒ Boolean
Check if, we should try to discover new bdrb servers.
-
#update_stats ⇒ Object
Update the stats and discover new nodes if they came up.
-
#worker(worker_name, worker_key = nil) ⇒ Object
return the worker proxy.
Methods included from ClientHelper
Constructor Details
#initialize ⇒ ClusterConnection
initialize cluster connection
9 10 11 12 13 14 15 16 17 18 19 20 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 9 def initialize @bdrb_servers = [] @backend_connections = [] @disconnected_connections = {} @last_polled_time = Time.now @request_count = 0 initialize_memcache if BackgrounDRb::BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache' establish_connections @round_robin = (0...@backend_connections.length).to_a end |
Instance Attribute Details
#backend_connections ⇒ Object
Returns the value of attribute backend_connections.
5 6 7 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 5 def backend_connections @backend_connections end |
#bdrb_servers ⇒ Object
Returns the value of attribute bdrb_servers.
5 6 7 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 5 def bdrb_servers @bdrb_servers end |
#cache ⇒ Object
Returns the value of attribute cache.
5 6 7 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 5 def cache @cache end |
#config ⇒ Object
Returns the value of attribute config.
5 6 7 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 5 def config @config end |
#disconnected_connections ⇒ Object
Returns the value of attribute disconnected_connections.
6 7 8 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 6 def disconnected_connections @disconnected_connections end |
Instance Method Details
#all_worker_info ⇒ Object
Send worker information of all currently running workers from all configured bdrb servers
119 120 121 122 123 124 125 126 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 119 def all_worker_info update_stats info_data = {} @backend_connections.each do |t_connection| info_data[t_connection.server_info] = t_connection.all_worker_info rescue nil end return info_data end |
#choose_server ⇒ Object
choose a server in round robin manner.
145 146 147 148 149 150 151 152 153 154 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 145 def choose_server if @round_robin.empty? @round_robin = (0...@backend_connections.length).to_a end if @round_robin.empty? && @backend_connections.empty? discover_server_periodically raise NoServerAvailable.new("No BackgrounDRb server is found running") if @round_robin.empty? && @backend_connections.empty? end @backend_connections[@round_robin.shift] end |
#discover_server_periodically ⇒ Object
every 10 request or 10 seconds it will try to reconnect to bdrb servers which were down
56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 56 def discover_server_periodically @disconnected_connections.each do |key,connection| connection.establish_connection if connection.connection_status @backend_connections << connection connection.close_connection @disconnected_connections[key] = nil end end @disconnected_connections.delete_if { |key,value| value.nil? } @round_robin = (0...@backend_connections.length).to_a end |
#establish_connections ⇒ Object
initialize all backend server connections
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 38 def establish_connections klass = Struct.new(:ip,:port) if t_servers = BackgrounDRb::BDRB_CONFIG[:client] connections = t_servers.split(',') connections.each do |conn_string| ip = conn_string.split(':')[0] port = conn_string.split(':')[1].to_i @bdrb_servers << klass.new(ip,port) end end @bdrb_servers << klass.new(BackgrounDRb::BDRB_CONFIG[:backgroundrb][:ip],BackgrounDRb::BDRB_CONFIG[:backgroundrb][:port].to_i) @bdrb_servers.each_with_index do |connection_info,index| next if @backend_connections.detect { |x| x.server_info == "#{connection_info.ip}:#{connection_info.port}" } @backend_connections << Connection.new(connection_info.ip,connection_info.port,self) end end |
#find_connection(host_info) ⇒ Object
Fina a connection by host name and port
84 85 86 87 88 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 84 def find_connection host_info conn = @backend_connections.detect { |x| x.server_info == host_info } raise NoServerAvailable.new("BackgrounDRb server is not found running on #{host_info}") unless conn return conn end |
#find_local ⇒ Object
find the local configured connection
91 92 93 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 91 def find_local find_connection("#{BackgrounDRb::BDRB_CONFIG[:backgroundrb][:ip]}:#{BackgrounDRb::BDRB_CONFIG[:backgroundrb][:port]}") end |
#find_next_except_these(connections) ⇒ Object
Find live connections except those mentioned in array, because they are already dead.
71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 71 def find_next_except_these connections invalid_connections = @backend_connections.select { |x| connections.include?(x.server_info) } @backend_connections.delete_if { |x| connections.include?(x.server_info) } @round_robin = (0...@backend_connections.length).to_a invalid_connections.each do |x| @disconnected_connections[x.server_info] = x end chosen = @backend_connections.detect { |x| !(connections.include?(x.server_info)) } raise NoServerAvailable.new("No BackgrounDRb server is found running") unless chosen chosen end |
#initialize_memcache ⇒ Object
initialize memache if client is storing results in memcache
23 24 25 26 27 28 29 30 31 32 33 34 35 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 23 def initialize_memcache require 'memcache' = { :c_threshold => 10_000, :compression => true, :debug => false, :namespace => 'backgroundrb_result_hash', :readonly => false, :urlencode => false } @cache = MemCache.new() @cache.servers = BackgrounDRb::BDRB_CONFIG[:memcache].split(',') end |
#new_worker(options = {}) ⇒ Object
one of the backend connections are chosen and worker is started on it
129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 129 def new_worker( = {}) update_stats succeeded = false result = nil @backend_connections.each do |connection| begin result = connection.new_worker() succeeded = true rescue BdrbConnError; end end raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded return [:worker_key] end |
#time_to_discover? ⇒ Boolean
Check if, we should try to discover new bdrb servers
108 109 110 111 112 113 114 115 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 108 def time_to_discover? if((@request_count%10 == 0) or (Time.now > (@last_polled_time + 10.seconds))) @last_polled_time = Time.now return true else return false end end |
#update_stats ⇒ Object
Update the stats and discover new nodes if they came up.
102 103 104 105 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 102 def update_stats @request_count += 1 discover_server_periodically if(time_to_discover? && !@disconnected_connections.empty?) end |
#worker(worker_name, worker_key = nil) ⇒ Object
return the worker proxy
96 97 98 99 |
# File 'lib/backgroundrb/bdrb_cluster_connection.rb', line 96 def worker(worker_name,worker_key = nil) update_stats RailsWorkerProxy.new(worker_name,worker_key,self) end |