Class: Pmux::Handler
- Inherits:
-
Object
- Object
- Pmux::Handler
- Defined in:
- lib/pmux/handler.rb
Instance Attribute Summary collapse
-
#options ⇒ Object
readonly
Returns the value of attribute options.
-
#server ⇒ Object
Returns the value of attribute server.
Instance Method Summary collapse
- #close_download_channel(node_addr) ⇒ Object
-
#exec_streaming_task(task) ⇒ Object
execute a task and return the result.
- #get_num_cpu ⇒ Object
- #get_properties ⇒ Object
- #get_status ⇒ Object
- #hello ⇒ Object
- #init_job(job_id) ⇒ Object
- #init_scan(addrs) ⇒ Object
-
#initialize(server = nil, options = {}) ⇒ Handler
constructor
A new instance of Handler.
- #ls(dirs, args) ⇒ Object
- #notify_reduce(params) ⇒ Object
- #quit ⇒ Object
- #scan_once ⇒ Object
Constructor Details
#initialize(server = nil, options = {}) ⇒ Handler
Returns a new instance of Handler.
8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/pmux/handler.rb', line 8 def initialize server=nil, ={} @server = server @options = @ipaddr = [:ipaddr] @wtq = [] @wq = [] @ases = {} @msession = nil @seqid = 0 end |
Instance Attribute Details
#options ⇒ Object (readonly)
Returns the value of attribute options.
5 6 7 |
# File 'lib/pmux/handler.rb', line 5 def @options end |
#server ⇒ Object
Returns the value of attribute server.
6 7 8 |
# File 'lib/pmux/handler.rb', line 6 def server @server end |
Instance Method Details
#close_download_channel(node_addr) ⇒ Object
168 169 170 171 |
# File 'lib/pmux/handler.rb', line 168 def close_download_channel node_addr @msession.close_channel node_addr if @msession @msession.class.to_s end |
#exec_streaming_task(task) ⇒ Object
execute a task and return the result
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/pmux/handler.rb', line 46 def exec_streaming_task task start_time = Time.now as = MR::AsyncResult.new if (task_keys = task['task_keys']) error_ids = [] fusion_id = task['task_id'] fiber = Fiber.new { for task_id, file in task_keys ntask = task.merge 'fusion_id'=>fusion_id, 'task_id'=>task_id, 'path'=>file result = do_one_task ntask, fiber end result.update :task_id=>fusion_id, :task_keys=>task_keys, :welapse=>(Time.now - start_time) as.result result } else fiber = Fiber.new { result = do_one_task(task, fiber) result[:welapse] = Time.now - start_time as.result result } end fiber.resume as end |
#get_num_cpu ⇒ Object
35 36 37 38 39 40 41 42 43 |
# File 'lib/pmux/handler.rb', line 35 def get_num_cpu cpuinfo_path = '/proc/cpuinfo' if File.exist? cpuinfo_path lines = File.readlines(cpuinfo_path).grep(/^processor/) lines.size else 2 end end |
#get_properties ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/pmux/handler.rb', line 181 def get_properties { 'hostname' => Socket.gethostname, 'program_name' => [:program_name], 'root_dir' => [:root_dir], 'tmp_dir' => [:tmp_dir], 'VERSION' => VERSION, 'RUBY_VERSION' => RUBY_VERSION, 'num_cpu' => get_num_cpu, } end |
#get_status ⇒ Object
174 175 176 177 178 179 |
# File 'lib/pmux/handler.rb', line 174 def get_status [ ['ruby_version', RUBY_VERSION, :string], ['hoge', 1, :gauge], ] end |
#hello ⇒ Object
193 194 195 |
# File 'lib/pmux/handler.rb', line 193 def hello 'hello' end |
#init_job(job_id) ⇒ Object
20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
# File 'lib/pmux/handler.rb', line 20 def init_job job_id path = "#{[:tmp_dir]}/#{job_id}" Dir.mkdir path [:job_dir] = path Log.init "#{path}/worker.log", :log_level=>'debug' num_cpu = get_num_cpu #fork_worker num_cpu, options { 'job_dir' => path, 'num_cpu' => num_cpu, } end |
#init_scan(addrs) ⇒ Object
155 156 157 158 159 160 161 162 |
# File 'lib/pmux/handler.rb', line 155 def init_scan addrs log_path = "#{[:log_dir]}/diffuser.log" Log.init log_path, :log_level=>'debug' @adapter = StorageAdapter.create 'pmuxfs', addrs @fs_dir = [:fs_dir] @adapter.set_fs_dir @fs_dir @fs_dir end |
#ls(dirs, args) ⇒ Object
197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/pmux/handler.rb', line 197 def ls dirs, args res = [] for dir in dirs for arg in args Dir.chdir(dir) { res += Dir.glob(arg).select {|path| File.readable? path}.map {|path| [path, File.join(dir, path)] } } end end res end |
#notify_reduce(params) ⇒ Object
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
# File 'lib/pmux/handler.rb', line 126 def notify_reduce params job_id, task_id, pindex, node_addr, ifbase = params.values_at 'job_id', 'task_id', 'pindex', 'node_addr', 'ifbase' ifpath = "#{ifbase}-#{pindex}" Log.debug "H: notify_reduce #{job_id}-#{task_id} #{ifbase}" if @ipaddr == node_addr # local local = "#{[:job_dir]}/#{File.basename(ifpath).sub(/^m/, 't')}" File.rename ifpath, local {:job_id=>job_id, :task_id=>task_id, :ifbase=>ifbase} else # remote @msession ||= MultiSession.new([], {:user=>@options[:user]}, @server.loop) @msession.connect_to_addr node_addr local = "#{[:job_dir]}/#{File.basename(ifpath).sub(/^m/, 't')}" future = @msession.scp_download node_addr, ifpath, local future.attach_callback {|f| if (as = @ases.delete "r#{job_id}-#{task_id}") as.result :job_id=>job_id, :task_id=>task_id, :ifbase=>ifbase end } @ases["r#{job_id}-#{task_id}"] = MR::AsyncResult.new end #rescue Exception end |
#quit ⇒ Object
212 213 214 215 216 217 |
# File 'lib/pmux/handler.rb', line 212 def quit @server.loop.stop cleaner = Cleaner.new "#{[:tmp_dir]}/[0-9]*" cleaner.run nil end |
#scan_once ⇒ Object
164 165 166 |
# File 'lib/pmux/handler.rb', line 164 def scan_once files = @adapter.find end |