Class: Pmux::Handler

Inherits:
Object
  • Object
show all
Defined in:
lib/pmux/handler.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server = nil, options = {}) ⇒ Handler

Returns a new instance of Handler.



8
9
10
11
12
13
14
15
16
17
18
# File 'lib/pmux/handler.rb', line 8

def initialize server=nil, options={}
  @server = server
  @options = options
  @ipaddr = options[:ipaddr]

  @wtq = []
  @wq = []
  @ases = {}
  @msession = nil
  @seqid = 0
end

Instance Attribute Details

#optionsObject (readonly)

Returns the value of attribute options.



5
6
7
# File 'lib/pmux/handler.rb', line 5

def options
  @options
end

#serverObject

Returns the value of attribute server.



6
7
8
# File 'lib/pmux/handler.rb', line 6

def server
  @server
end

Instance Method Details

#close_download_channel(node_addr) ⇒ Object



168
169
170
171
# File 'lib/pmux/handler.rb', line 168

def close_download_channel node_addr
  @msession.close_channel node_addr if @msession
  @msession.class.to_s
end

#exec_streaming_task(task) ⇒ Object

execute a task and return the result



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/pmux/handler.rb', line 46

def exec_streaming_task task
  start_time = Time.now
  as = MR::AsyncResult.new
  if (task_keys = task['task_keys'])
    error_ids = []
    fusion_id = task['task_id']
    fiber = Fiber.new {
      for task_id, file in task_keys
        ntask = task.merge 'fusion_id'=>fusion_id,
          'task_id'=>task_id, 'path'=>file
        result = do_one_task ntask, fiber
      end
      result.update :task_id=>fusion_id, :task_keys=>task_keys,
        :welapse=>(Time.now - start_time)
      as.result result
    }
  else
    fiber = Fiber.new {
      result = do_one_task(task, fiber)
      result[:welapse] = Time.now - start_time
      as.result result
    }
  end
  fiber.resume
  as
end

#get_num_cpuObject



35
36
37
38
39
40
41
42
43
# File 'lib/pmux/handler.rb', line 35

def get_num_cpu
  cpuinfo_path = '/proc/cpuinfo'
  if File.exist? cpuinfo_path
    lines = File.readlines(cpuinfo_path).grep(/^processor/)
    lines.size
  else
    2
  end
end

#get_propertiesObject



181
182
183
184
185
186
187
188
189
190
191
# File 'lib/pmux/handler.rb', line 181

def get_properties
  {
    'hostname' => Socket.gethostname,
    'program_name' => options[:program_name],
    'root_dir' => options[:root_dir],
    'tmp_dir' => options[:tmp_dir],
    'VERSION' => VERSION,
    'RUBY_VERSION' => RUBY_VERSION,
    'num_cpu' => get_num_cpu,
  }
end

#get_statusObject



174
175
176
177
178
179
# File 'lib/pmux/handler.rb', line 174

def get_status
  [
    ['ruby_version', RUBY_VERSION, :string],
    ['hoge', 1, :gauge],
  ]
end

#helloObject



193
194
195
# File 'lib/pmux/handler.rb', line 193

def hello
  'hello'
end

#init_job(job_id) ⇒ Object



20
21
22
23
24
25
26
27
28
29
30
31
32
33
# File 'lib/pmux/handler.rb', line 20

def init_job job_id
  path = "#{options[:tmp_dir]}/#{job_id}"
  Dir.mkdir path
  options[:job_dir] = path
  Log.init "#{path}/worker.log", :log_level=>'debug'

  num_cpu = get_num_cpu
  #fork_worker num_cpu, options

  {
    'job_dir' => path,
    'num_cpu' => num_cpu,
  }
end

#init_scan(addrs) ⇒ Object



155
156
157
158
159
160
161
162
# File 'lib/pmux/handler.rb', line 155

def init_scan addrs
  log_path = "#{options[:log_dir]}/diffuser.log"
  Log.init log_path, :log_level=>'debug'
  @adapter = StorageAdapter.create 'pmuxfs', addrs
  @fs_dir = options[:fs_dir]
  @adapter.set_fs_dir @fs_dir
  @fs_dir
end

#ls(dirs, args) ⇒ Object



197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/pmux/handler.rb', line 197

def ls dirs, args
  res = []
  for dir in dirs
    for arg in args
      Dir.chdir(dir) {
        res += Dir.glob(arg).select {|path|
          File.readable? path}.map {|path|
          [path, File.join(dir, path)]
        }
      }
    end
  end
  res
end

#notify_reduce(params) ⇒ Object



126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/pmux/handler.rb', line 126

def notify_reduce params
  job_id, task_id, pindex, node_addr, ifbase =
    params.values_at 'job_id', 'task_id', 'pindex', 'node_addr', 'ifbase'
  ifpath = "#{ifbase}-#{pindex}"
  Log.debug "H: notify_reduce #{job_id}-#{task_id} #{ifbase}"

  if @ipaddr == node_addr
    # local
    local = "#{options[:job_dir]}/#{File.basename(ifpath).sub(/^m/, 't')}"
    File.rename ifpath, local
    {:job_id=>job_id, :task_id=>task_id, :ifbase=>ifbase}
  else
    # remote
    @msession ||=
      MultiSession.new([], {:user=>@options[:user]}, @server.loop)
    @msession.connect_to_addr node_addr
    local = "#{options[:job_dir]}/#{File.basename(ifpath).sub(/^m/, 't')}"
    future = @msession.scp_download node_addr, ifpath, local
    future.attach_callback {|f|
      if (as = @ases.delete "r#{job_id}-#{task_id}")
        as.result :job_id=>job_id, :task_id=>task_id, :ifbase=>ifbase
      end
    }
    @ases["r#{job_id}-#{task_id}"] = MR::AsyncResult.new
  end
#rescue Exception
end

#quitObject



212
213
214
215
216
217
# File 'lib/pmux/handler.rb', line 212

def quit
  @server.loop.stop
  cleaner = Cleaner.new "#{options[:tmp_dir]}/[0-9]*"
  cleaner.run
  nil
end

#scan_onceObject



164
165
166
# File 'lib/pmux/handler.rb', line 164

def scan_once
  files = @adapter.find
end