36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
# File 'lib/scout/tsv/open.rb', line 36
def self.traverse(obj, into: nil, cpus: nil, bar: nil, callback: nil, unnamed: true, keep_open: false, **options, &block)
cpus = nil if cpus.to_i == 1
if into == :stream
sout, sin = Open.pipe
ConcurrentStream.setup(sout, :pair => sin)
ConcurrentStream.setup(sin, :pair => sout)
self.traverse(obj, into: sin, cpus: cpus, bar: bar, callback: callback, unnamed: unnamed, **options, &block)
return sout
elsif Path === into
Open.write(into) do |io|
self.traverse(obj, into: io, cpus: cpus, bar: bar, callback: callback, unnamed: unnamed, **options, &block)
end
return into
end
if into || bar
orig_callback = callback if callback
bar = Log::ProgressBar.get_obj_bar(obj, bar) if bar
bar.init if bar
callback = proc do |res|
bar.tick if bar
traverse_add into, res if into && ! res.nil?
orig_callback.call res if orig_callback
end
if into.respond_to?(:close)
into_thread = Thread.new do
Thread.current.report_on_exception = false
Thread.current["name"] = "Traverse into"
error = false
begin
self.traverse(obj, callback: callback, cpus: cpus, unnamed: unnamed, **options, &block)
into.close if ! keep_open && into.respond_to?(:close)
bar.remove if bar
rescue Exception
bar.remove($!) if bar
into.abort($!) if into.respond_to?(:abort)
end
end
Thread.pass until into_thread["name"]
case into
when IO
ConcurrentStream.setup into, :threads => into_thread
when TSV::Dumper
ConcurrentStream.setup into.stream, :threads => into_thread
end
return into
end
end
if cpus
queue = WorkQueue.new cpus do |args|
block.call *args
end
queue.process do |res|
callback.call res if callback
end
begin
self.traverse(obj, **options) do |*args|
queue.write args
end
queue.close
queue.join(false)
bar.remove if bar
return into
rescue Exception
bar.remove($!) if bar
queue.abort
raise $!
ensure
queue.clean
end
end
begin
res = case obj
when TSV
obj.traverse unnamed: unnamed, **options do |k,v,f|
res = block.call(k, v, f)
callback.call res if callback
nil
end
when Hash
obj.each do |key,value|
res = block.call(key,value)
callback.call res if callback
nil
end
when Array
obj.each do |line|
res = block.call(line)
callback.call res if callback
nil
end
when String
obj = obj.produce_and_find if Path === obj
f = Open.open(obj)
self.traverse(f, cpus: cpus, callback: callback, **options, &block)
when (defined?(Step) && Step)
raise obj.exception if obj.error?
self.traverse(obj.stream, cpus: cpus, callback: callback, **options, &block)
when IO
if options[:type] == :array || options[:type] == :line
Log.low "Traverse stream by lines #{Log.fingerprint obj}"
while line = obj.gets
line.strip!
res = block.call(line)
callback.call res if callback
end
else
Log.low "Traverse stream with parser #{Log.fingerprint obj}"
parser = options[:sep].nil? ? TSV::Parser.new(obj) : TSV::Parser.new(obj, sep: options[:sep])
parser.traverse **options do |k,v,f|
res = block.call k,v,f
callback.call res if callback
nil
end
end
when TSV::Parser
obj.traverse **options do |k,v,f|
res = block.call k, v, f
callback.call res if callback
nil
end
else
if obj.respond_to?(:pop)
while elem = obj.pop
res = block.call elem
callback.call res if callback
break unless obj.any?
end
else
TSV.parse obj, **options do |k,v|
res = block.call k, v
callback.call res if callback
nil
end
end
end
bar.remove if bar
rescue Exception => exception
exception = obj.stream_exception if (ConcurrentStream === obj) && obj.stream_exception
bar.error if bar
raise exception
end
into || res
end
|