Class: Fairy::CBasicGroupBy

Inherits:
CIOFilter show all
Defined in:
lib/fairy/master/c-basic-group-by.rb

Instance Attribute Summary

Attributes inherited from CIOFilter

#input

Attributes included from CInputtable

#input

Instance Method Summary collapse

Methods inherited from CIOFilter

#node_class, #output=

Methods included from CInputtable

#break_running, #inputtable?

Methods inherited from CFilter

#abort_create_node, #add_node, #assgin_number_of_nodes?, #break_create_node, #break_running, #create_and_add_node, #create_import, #create_node, #create_nodes, #def_job_pool_variable, #each_node, #each_node_exist_only, #handle_exception, #input, #job_pool_dict, #job_pool_variable, #nodes, #number_of_nodes, #number_of_nodes=, #pool_dict, #postmapping_policy, #start_export, #start_watch_node_status, #update_status, watch_status, watch_status=, #watch_status?

Constructor Details

#initialize(controller, opts, block_source) ⇒ CBasicGroupBy

Returns a new instance of CBasicGroupBy.



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/fairy/master/c-basic-group-by.rb', line 15

def initialize(controller, opts, block_source)
  super
  @block_source = block_source

  @no_of_exports = 0

  # key -> [export, ...]
  @exports = {}
  @exports_mutex = Mutex.new
  @exports_cv = XThread::ConditionVariable.new

#      @pre_exports_queue = Queue.new
  @exports_queue = XThread::Queue.new

  @each_export_by_thread = nil
  @each_export_by_thread_mutex = Mutex.new
end

Instance Method Details

#add_exports(key, export, njob) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/fairy/master/c-basic-group-by.rb', line 113

def add_exports(key, export, njob)
  @exports_mutex.synchronize do
	if exports = @exports[key]
	  export.output = exports.first.output if exports.first.output?
	  export.no = exports.first.no
	  exports.push export
	else
	  export.no = @no_of_exports
	  @no_of_exports += 1
	  @exports[key] = [export]
	  @exports_queue.push [export, njob]
#	  @pre_exports_queue.push [export, njob]
	end
  end
end

#all_node_arrived?Boolean

Returns:

  • (Boolean)


222
223
224
# File 'lib/fairy/master/c-basic-group-by.rb', line 222

def all_node_arrived?
  @nodes_mutex.synchronize{@number_of_nodes}
end

#all_node_imported?Boolean

Returns:

  • (Boolean)


226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/fairy/master/c-basic-group-by.rb', line 226

def all_node_imported?
  # すべてのnjobがそろったか?
  return false unless @nodes_mutex.synchronize{@number_of_nodes}

  each_node(:exist_only) do |node|
	st = @nodes_status[node]
	# こちらはNG: outputが設定されていないとまずい.
	# すべてのnodeがそろったとしてもすべてのexportがそろっているとは限らない
#	unless [:ST_FINISH, :ST_EXPORT_FINISH, :ST_WAIT_EXPORT_FINISH, :ST_ALL_IMPORTED].include?(st)
	unless [:ST_FINISH, :ST_EXPORT_FINISH, :ST_WAIT_EXPORT_FINISH].include?(st)
	  return false
	end
  end
  true
end

#bind_export(exp, imp) ⇒ Object



109
110
111
# File 'lib/fairy/master/c-basic-group-by.rb', line 109

def bind_export(exp, imp)
  # do nothing
end

#each_assigned_filter(&block) ⇒ Object

def next_filter(mapper)

ret = super
unless ret

@each_export_by_thread_mutex.synchronize do @each_export_by_thread.join if @each_export_by_thread end

  end
  ret 
end


55
56
57
58
59
60
61
# File 'lib/fairy/master/c-basic-group-by.rb', line 55

def each_assigned_filter(&block)
  super

  @each_export_by_thread_mutex.synchronize do
	@each_export_by_thread.join if @each_export_by_thread
  end
end

#each_export_by(njob, mapper, &block) ⇒ Object

begin while pair = @exports_queue.pop exp, njob = pair Log::debug(self, “EXPORT_BY, #Fairy::CBasicGroupBy.expexp.key”) block.call exp end

rescue

Log::fatal_exception

  end
  @each_export_by_thread = true
end


78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/fairy/master/c-basic-group-by.rb', line 78

def each_export_by(njob, mapper, &block)
  @each_export_by_thread_mutex.synchronize do
	return if @each_export_by_thread

	@each_export_by_thread = Thread.start{
	  # すべての njob がそろうまで待つ
	  # 後段が先にスケジュールされてデッドロックするのを避けるため.
	  number_of_nodes

	  begin
 while pair = @exports_queue.pop
   exp, njob = pair
Log::debug(self, "EXPORT_BY, #{exp.key}")
   block.call exp

   @exports_mutex.synchronize do
		if @exports[exp.key].first == exp
@exports[exp.key][1..-1].each do |e|
  e.output = exp.output
end
		end
   end
 end
	  rescue
 Log::fatal_exception
 raise
	  end
	}
  end
end

#njob_creation_paramsObject



138
139
140
# File 'lib/fairy/master/c-basic-group-by.rb', line 138

def njob_creation_params
  [@block_source]
end

#node_class_nameObject



134
135
136
# File 'lib/fairy/master/c-basic-group-by.rb', line 134

def node_class_name
  "PBasicGroupBy"
end

#start_create_nodesObject



33
34
35
36
37
# File 'lib/fairy/master/c-basic-group-by.rb', line 33

def start_create_nodes
  super

  start_watch_all_node_imported
end

#start_watch_all_node_importedObject



142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# File 'lib/fairy/master/c-basic-group-by.rb', line 142

def start_watch_all_node_imported
  Thread.start do
	# すべての njob がそろうまで待つ
	# 後段が先にスケジュールされてデッドロックするのを避けるため.
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: S")
	number_of_nodes

Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 1")


Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 2")
	# すべての exports がそろうまで待つ
	@nodes_status_mutex.synchronize do
	  while !all_node_imported?
 @nodes_status_cv.wait(@nodes_status_mutex)
	  end
	end
	@exports_queue.push nil

Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 4")
	for key, exports in @exports
	  exports.first.output_no_import = exports.size
	end
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: E")
  end
  nil
end

#start_watch_all_node_imported_ORGObject



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/fairy/master/c-basic-group-by.rb', line 170

def start_watch_all_node_imported_ORG
  Thread.start do
	# すべての njob がそろうまで待つ
	# 後段が先にスケジュールされてデッドロックするのを避けるため.
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: S")
	number_of_nodes

Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 1")
	# すでに存在するexportsを下流に送る
	@exports_mutex.synchronize do
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 1.1")
	  @pre_exports_queue.push nil
	  while pair = @pre_exports_queue.pop
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 1.2: EXP.NO: #{pair[0].no}")
 @exports_queue.push pair
	  end
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 1.3")
	end

Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 2")
	# すべての exports がそろうまで待つ
	@nodes_status_mutex.synchronize do
	  while !all_node_imported?
 @nodes_status_cv.wait(@nodes_status_mutex)
	  end
	end

Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 3")
	# 残りのexportsを下流に送る
	@pre_exports_queue.push nil
	while pair = @pre_exports_queue.pop
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 3.1: EXP.NO: #{pair[0].no}")
	  @exports_queue.push pair
	end
	@exports_queue.push nil
	
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: 4")
#Log::debug(self, "START: setting for EXPOTRS.SIZE")
	for key, exports in @exports
#	  exports[1..-1].each do |exp|
#	    exp.output=exports.first.output
#	  end

#Log::debug(self, "EXPOTRS.SIZE=#{exports.size}")
	  exports.first.output_no_import = exports.size
	end
#Log::debug(self, "END: setting for EXPOTRS.SIZE")
Log::debug(self, "START_WATCH_ALL_NODE_IMPORTED: E")
  end
  nil
end

#update_exports(key, export, njob) ⇒ Object



129
130
131
132
# File 'lib/fairy/master/c-basic-group-by.rb', line 129

def update_exports(key, export, njob)
  add_exports(key, export, njob)
  nil
end