Class: Fairy::CInputLocalFile
- Defined in:
- lib/fairy/master/c-input-local-file.rb
Class Method Summary collapse
-
.open(controller, opts = nil) ⇒ Object
DeepConnect.def_single_method_spec(self, “REF new(REF, VAL)”).
Instance Method Summary collapse
-
#initialize(controller, opts = nil) ⇒ CInputLocalFile
constructor
A new instance of CInputLocalFile.
- #input ⇒ Object
- #njob_input_finished?(njob) ⇒ Boolean
- #node_class_name ⇒ Object
- #start(job) ⇒ Object
-
#wait_input_finished(njob) ⇒ Object
def create_and_start_nodes_split begin no_nodes = 0 @job.split_opens(@opts) do |io| @create_node_mutex.synchronize do no_nodes += 1 nlfileinput = nil @controller.assign_new_processor(self) do |processor| nlfileinput = create_node(processor) end Thread.start(nlfileinput) do |nlfi| begin nlfi.open(io) wait_input_finished(nlfi) ensure io.close end end end end rescue BreakCreateNode # do nothing Log::debug self, “BREAK CREATE NODE: #self” ensure self.number_of_nodes = no_nodes end end.
Methods inherited from CInput
Methods inherited from CFilter
#abort_create_node, #add_node, #assgin_number_of_nodes?, #bind_export, #break_create_node, #break_running, #create_and_add_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_assigned_filter, #each_export_by, #each_node, #each_node_exist_only, #handle_exception, #job_pool_dict, #job_pool_variable, #njob_creation_params, #nodes, #number_of_nodes, #number_of_nodes=, #pool_dict, #postmapping_policy, #start_create_nodes, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?
Constructor Details
#initialize(controller, opts = nil) ⇒ CInputLocalFile
Returns a new instance of CInputLocalFile.
24 25 26 |
# File 'lib/fairy/master/c-input-local-file.rb', line 24 def initialize(controller, opts = nil) super end |
Class Method Details
.open(controller, opts = nil) ⇒ Object
DeepConnect.def_single_method_spec(self, “REF new(REF, VAL)”)
18 19 20 21 22 |
# File 'lib/fairy/master/c-input-local-file.rb', line 18 def CInputLocalFile.open(controller, opts = nil) blfileinput = CInputLocalFile.new(controller, opts) blfileinput.open(descripter) blfileinput end |
Instance Method Details
#input ⇒ Object
37 38 39 |
# File 'lib/fairy/master/c-input-local-file.rb', line 37 def input @cio_place end |
#njob_input_finished?(njob) ⇒ Boolean
110 111 112 113 114 |
# File 'lib/fairy/master/c-input-local-file.rb', line 110 def njob_input_finished?(njob) return false st = @nodes_status[njob] [:ST_WAIT_EXPORT_FINISH, :ST_EXPORT_FINISH, :ST_FINISH, :ST_OUTPUT_FINISH].include?(st) end |
#node_class_name ⇒ Object
28 29 30 |
# File 'lib/fairy/master/c-input-local-file.rb', line 28 def node_class_name "PInputLocalFile" end |
#start(job) ⇒ Object
32 33 34 35 |
# File 'lib/fairy/master/c-input-local-file.rb', line 32 def start(job) @cio_place = CLocalIOPlace.new(job) start_create_nodes end |
#wait_input_finished(njob) ⇒ Object
def create_and_start_nodes_split
begin
no_nodes = 0 @job.split_opens(@opts) do |io| @create_node_mutex.synchronize do no_nodes += 1 nlfileinput = nil @controller.assign_new_processor(self) do |processor| nlfileinput = create_node(processor) end Thread.start(nlfileinput) do |nlfi| begin nlfi.open(io) wait_input_finished(nlfi) ensure io.close end end end end
rescue BreakCreateNode
# do nothing Log::debug self, “BREAK CREATE NODE: #self”
ensure
self.number_of_nodes = no_nodes
end
end
102 103 104 105 106 107 108 |
# File 'lib/fairy/master/c-input-local-file.rb', line 102 def wait_input_finished(njob) while !njob_input_finished?(njob) @nodes_status_mutex.synchronize do @nodes_status_cv.wait(@nodes_status_mutex) end end end |