Class: BackgrounDRb::RailsWorkerProxy
- Inherits:
-
Object
- Object
- BackgrounDRb::RailsWorkerProxy
- Defined in:
- lib/backgroundrb/rails_worker_proxy.rb
Overview
A Worker proxy, which uses method_missing
for delegating method calls to the workers
Instance Attribute Summary collapse
-
#data ⇒ Object
Returns the value of attribute data.
-
#middle_man ⇒ Object
Returns the value of attribute middle_man.
-
#worker_key ⇒ Object
Returns the value of attribute worker_key.
-
#worker_method ⇒ Object
Returns the value of attribute worker_method.
-
#worker_name ⇒ Object
Returns the value of attribute worker_name.
Instance Method Summary collapse
-
#ask_result(job_key) ⇒ Object
get results back from the cache.
-
#choose_connection(host_info) ⇒ Object
choose a worker.
-
#compact(options = { }) ⇒ Object
helper method to compact a hash and for getting rid of nil parameters.
-
#delete ⇒ Object
delete a worker.
-
#dequeue_task(options = {}) ⇒ Object
remove tasks from the worker pool.
-
#enqueue_task(options = {}) ⇒ Object
enqueue tasks to the worker pool.
-
#gen_key(options) ⇒ Object
generate worker key.
-
#initialize(p_worker_name, p_worker_key = nil, p_middle_man = nil) ⇒ RailsWorkerProxy
constructor
create new worker proxy.
-
#invoke_on_connection(connection, method_name, options = {}) ⇒ Object
choose a backgroundrb server connection and invoke worker method on it.
- #method_missing(method_id, *args) ⇒ Object
- #process_result(t_result) ⇒ Object
-
#reset_memcache_result(job_key, value) ⇒ Object
reset result within memcache for given key.
- #return_result(result) ⇒ Object
-
#return_result_from_memcache(options = {}) ⇒ Object
return result from memcache.
-
#run_method(host_info, method_name, worker_options = {}) ⇒ Object
invoke method on worker.
-
#worker_info ⇒ Object
return runtime information about worker.
Constructor Details
#initialize(p_worker_name, p_worker_key = nil, p_middle_man = nil) ⇒ RailsWorkerProxy
create new worker proxy
7 8 9 10 11 12 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 7 def initialize(p_worker_name,p_worker_key = nil,p_middle_man = nil) @worker_name = p_worker_name @middle_man = p_middle_man @worker_key = p_worker_key @tried_connections = [] end |
Dynamic Method Handling
This class handles dynamic methods through the method_missing method
#method_missing(method_id, *args) ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 14 def method_missing(method_id,*args) worker_method = method_id.to_s arguments = args.first arg,job_key,host_info,scheduled_at,priority = arguments && arguments.values_at(:arg,:job_key,:host,:scheduled_at, :priority) # allow both arg and args arg ||= arguments && arguments[:args] new_schedule = (scheduled_at && scheduled_at.respond_to?(:utc)) ? scheduled_at.utc : Time.now.utc if worker_method =~ /^async_(\w+)/ method_name = $1 = compact(:worker => worker_name,:worker_key => worker_key, :worker_method => method_name,:job_key => job_key, :arg => arg) run_method(host_info,:ask_work,) elsif worker_method =~ /^enq_(\w+)/i raise NoJobKey.new("Must specify a job key with enqueued tasks") if job_key.blank? method_name = $1 marshalled_args = Marshal.dump(arg) enqueue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s, :worker_method => method_name.to_s,:job_key => job_key.to_s, :priority => priority, :args => marshalled_args,:timeout => arguments ? arguments[:timeout] : nil,:scheduled_at => new_schedule)) elsif worker_method =~ /^deq_(\w+)/i raise NoJobKey.new("Must specify a job key to dequeue tasks") if job_key.blank? method_name = $1 dequeue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s, :worker_method => method_name.to_s,:job_key => job_key.to_s)) else = compact(:worker => worker_name,:worker_key => worker_key, :worker_method => worker_method,:job_key => job_key,:arg => arg) run_method(host_info,:send_request,) end end |
Instance Attribute Details
#data ⇒ Object
Returns the value of attribute data.
4 5 6 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 4 def data @data end |
#middle_man ⇒ Object
Returns the value of attribute middle_man.
4 5 6 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 4 def middle_man @middle_man end |
#worker_key ⇒ Object
Returns the value of attribute worker_key.
4 5 6 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 4 def worker_key @worker_key end |
#worker_method ⇒ Object
Returns the value of attribute worker_method.
4 5 6 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 4 def worker_method @worker_method end |
#worker_name ⇒ Object
Returns the value of attribute worker_name.
4 5 6 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 4 def worker_name @worker_name end |
Instance Method Details
#ask_result(job_key) ⇒ Object
get results back from the cache. Cache can be in-memory worker cache or memcache based cache
114 115 116 117 118 119 120 121 122 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 114 def ask_result job_key = compact(:worker => worker_name,:worker_key => worker_key,:job_key => job_key) if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache' return_result_from_memcache() else result = middle_man.backend_connections.map { |conn| conn.ask_result() } return_result(result) end end |
#choose_connection(host_info) ⇒ Object
choose a worker
165 166 167 168 169 170 171 172 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 165 def choose_connection host_info case host_info when :all; middle_man.backend_connections when :local; middle_man.find_local when String; middle_man.find_connection(host_info) else; middle_man.choose_server end end |
#compact(options = { }) ⇒ Object
helper method to compact a hash and for getting rid of nil parameters
175 176 177 178 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 175 def compact( = { }) .delete_if { |key,value| value.nil? } end |
#delete ⇒ Object
delete a worker
157 158 159 160 161 162 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 157 def delete middle_man.backend_connections.each do |connection| connection.delete_worker(compact(:worker => worker_name, :worker_key => worker_key)) end return worker_key end |
#dequeue_task(options = {}) ⇒ Object
remove tasks from the worker pool
55 56 57 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 55 def dequeue_task = {} BdrbJobQueue.remove_job() end |
#enqueue_task(options = {}) ⇒ Object
enqueue tasks to the worker pool
50 51 52 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 50 def enqueue_task = {} BdrbJobQueue.insert_job() end |
#gen_key(options) ⇒ Object
generate worker key
132 133 134 135 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 132 def gen_key key = [[:worker],[:worker_key],[:job_key]].compact.join('_') key end |
#invoke_on_connection(connection, method_name, options = {}) ⇒ Object
choose a backgroundrb server connection and invoke worker method on it.
107 108 109 110 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 107 def invoke_on_connection connection,method_name, = {} raise NoServerAvailable.new("No BackgrounDRb is found running") unless connection connection.send(method_name,) end |
#process_result(t_result) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 87 def process_result t_result case t_result when Hash if(t_result[:result] == true && t_result[:type] = :response) if(t_result[:result_flag] == "ok") return t_result[:data] else raise RemoteWorkerError.new("Error while executing worker method") end elsif(t_result[:result_flag] == "ok") "ok" elsif(t_result[:result_flag] == "error") raise RemoteWorkerError.new("Error while executing worker method") end when Array t_result end end |
#reset_memcache_result(job_key, value) ⇒ Object
reset result within memcache for given key
143 144 145 146 147 148 149 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 143 def reset_memcache_result(job_key,value) = compact(:worker => worker_name,:worker_key => worker_key,\ :job_key => job_key) key = gen_key() middle_man.cache[key] = value value end |
#return_result(result) ⇒ Object
151 152 153 154 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 151 def return_result result result = Array(result) result.size <= 1 ? result[0] : result end |
#return_result_from_memcache(options = {}) ⇒ Object
return result from memcache
138 139 140 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 138 def return_result_from_memcache = {} middle_man.cache[gen_key()] end |
#run_method(host_info, method_name, worker_options = {}) ⇒ Object
invoke method on worker
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 60 def run_method host_info,method_name, = {} result = [] connection = choose_connection(host_info) raise NoServerAvailable.new("No BackgrounDRb server is found running") if connection.blank? if host_info == :local or host_info.is_a?(String) result << invoke_on_connection(connection,method_name,) elsif host_info == :all succeeded = false begin connection.each { |conn| result << invoke_on_connection(conn,method_name,) } succeeded = true rescue BdrbConnError; end raise NoServerAvailable.new("No BackgrounDRb server is found running") unless succeeded else @tried_connections = [connection.server_info] begin result << invoke_on_connection(connection,method_name,) rescue BdrbConnError => e connection = middle_man.find_next_except_these(@tried_connections) @tried_connections << connection.server_info retry end end #return nil if method_name == :ask_work process_result(return_result(result)) end |
#worker_info ⇒ Object
return runtime information about worker
125 126 127 128 129 |
# File 'lib/backgroundrb/rails_worker_proxy.rb', line 125 def worker_info t_connections = middle_man.backend_connections result = t_connections.map { |conn| conn.worker_info(compact(:worker => worker_name,:worker_key => worker_key)) } return_result(result) end |