Class: Fairy::PBasicGroupBy
Constant Summary
collapse
- ST_ALL_IMPORTED =
:ST_ALL_IMPORTED
- ST_WAIT_EXPORT_FINISH =
:ST_WAIT_EXPORT_FINISH
- ST_EXPORT_FINISH =
:ST_EXPORT_FINISH
Constants inherited
from PIOFilter
Fairy::PIOFilter::ST_WAIT_IMPORT
Constants inherited
from PFilter
Fairy::PFilter::END_OF_STREAM, Fairy::PFilter::ST_ACTIVATE, Fairy::PFilter::ST_FINISH, Fairy::PFilter::ST_INIT
Instance Attribute Summary
Attributes inherited from PFilter
#IGNORE_EXCEPTION, #id, #log_id, #ntask
Instance Method Summary
collapse
Methods inherited from PIOFilter
#input=
Methods inherited from PFilter
#abort_running, #basic_start, #break_running, #each, #global_break, #global_break_from_other, #handle_exception, #key, #key=, #next, #no, #no=, #notice_status, #processor, #start, #start_watch_status, #status=, #terminate_proc
Constructor Details
#initialize(id, ntask, bjob, opts, block_source) ⇒ PBasicGroupBy
Returns a new instance of PBasicGroupBy.
17
18
19
20
21
22
23
24
25
26
27
|
# File 'lib/fairy/node/p-basic-group-by.rb', line 17
def initialize(id, ntask, bjob, opts, block_source)
super
@block_source = block_source
@exports = {}
@exports_queue = XThread::Queue.new
@counter = {}
end
|
Instance Method Details
#add_export(key, export) ⇒ Object
41
42
43
44
45
46
|
# File 'lib/fairy/node/p-basic-group-by.rb', line 41
def add_export(key, export)
@exports[key] = export
@bjob.add_exports(key, export, self)
end
|
#hash_key(e) ⇒ Object
90
91
92
|
# File 'lib/fairy/node/p-basic-group-by.rb', line 90
def hash_key(e)
@key_proc.yield(e)
end
|
#init_key_proc ⇒ Object
29
30
31
32
33
34
35
36
37
38
39
|
# File 'lib/fairy/node/p-basic-group-by.rb', line 29
def init_key_proc
hash_opt = @opts[:grouping_optimize]
hash_opt = CONF.GROUP_BY_GROUPING_OPTIMIZE if hash_opt.nil?
if hash_opt
@key_proc = eval("proc{#{@block_source.source}}", @context.binding)
else
@key_proc = BBlock.new(@block_source, @context, self)
end
@key_proc
end
|
#start_export ⇒ Object
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/fairy/node/p-basic-group-by.rb', line 48
def start_export
Log::debug(self, "START_EXPORT")
start do
init_key_proc
policy = @opts[:postqueuing_policy]
begin
@input.each do |e|
key = hash_key(e)
if Import::CTLTOKEN_NULLVALUE === key
next
end
export = @exports[key]
unless export
export = Export.new(policy)
export.njob_id = @id
export.add_key(key)
add_export(key, export)
@counter[key] = 0
end
export.push e
@counter[key] += 1
end
rescue
Log::debug_exception(self)
raise
ensure
@exports_queue.push nil
@exports.each_pair{|key, export|
Log::debug(self, "G0 #{key} => #{@counter[key]}")
export.push END_OF_STREAM}
end
end
end
|
#terminate ⇒ Object
84
85
86
87
88
|
# File 'lib/fairy/node/p-basic-group-by.rb', line 84
def terminate
@wait_cv = @terminate_mon.new_cv
wait_export_finish
super
end
|
#wait_export_finish ⇒ Object
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/fairy/node/p-basic-group-by.rb', line 94
def wait_export_finish
Log::debug(self, "G1")
self.status = ST_ALL_IMPORTED
Log::debug(self, "G2")
self.status = ST_WAIT_EXPORT_FINISH
Log::debug(self, "G3")
@exports.each_pair do |key, export|
Log::debug(self, "G3.WAIT #{key}")
@terminate_mon.synchronize do
export.fib_wait_finish(@wait_cv)
end
end
Log::debug(self, "G4")
self.status = ST_EXPORT_FINISH
end
|