Module: Open

Defined in:
lib/scout/tsv/open.rb

Class Method Summary collapse

Class Method Details

.traverse(obj, into: nil, cpus: nil, bar: nil, callback: nil, unnamed: true, keep_open: false, **options, &block) ⇒ Object



36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/scout/tsv/open.rb', line 36

def self.traverse(obj, into: nil, cpus: nil, bar: nil, callback: nil, unnamed: true, keep_open: false, **options, &block)
  cpus = nil if cpus.to_i == 1

  if into == :stream
    sout, sin = Open.pipe
    ConcurrentStream.setup(sout, :pair => sin)
    ConcurrentStream.setup(sin, :pair => sout)
    self.traverse(obj, into: sin, cpus: cpus, bar: bar, callback: callback, unnamed: unnamed, **options, &block)
    return sout
  elsif Path === into
    Open.write(into) do |io|
      self.traverse(obj, into: io, cpus: cpus, bar: bar, callback: callback, unnamed: unnamed, **options, &block)
    end
    return into
  end

  if into || bar
    orig_callback = callback if callback
    bar = Log::ProgressBar.get_obj_bar(obj, bar) if bar
    bar.init if bar
    callback = proc do |res|
      bar.tick if bar
      traverse_add into, res if into && ! res.nil?
      orig_callback.call res if orig_callback
    end

    if into.respond_to?(:close)
      into_thread = Thread.new do 
        Thread.current.report_on_exception = false
        Thread.current["name"] = "Traverse into"
        error = false
        begin
          self.traverse(obj, callback: callback, cpus: cpus, unnamed: unnamed, **options, &block)
          into.close if ! keep_open && into.respond_to?(:close)
          bar.remove if bar
        rescue Exception
          bar.remove($!) if bar
          into.abort($!) if into.respond_to?(:abort)
        end
      end

      Thread.pass until into_thread["name"]

      case into
      when IO
        ConcurrentStream.setup into, :threads => into_thread
      when TSV::Dumper
        ConcurrentStream.setup into.stream, :threads => into_thread
      end
      return into
    end
  end

  if cpus
    queue = WorkQueue.new cpus do |args|
      block.call *args
    end

    queue.process do |res|
      callback.call res if callback
    end
    
    begin
      self.traverse(obj, **options) do |*args|
        queue.write args
      end

      queue.close

      queue.join(false)

      bar.remove if bar
      return into
    rescue Exception
      bar.remove($!) if bar
      queue.abort
      raise $!
    ensure
      queue.clean
    end
  end

  begin
    res = case obj
          when TSV
            obj.traverse  unnamed: unnamed, **options do |k,v,f|
              res = block.call(k, v, f)
              callback.call res if callback
              nil
            end
          when Hash
            obj.each do |key,value|
              res = block.call(key,value)
              callback.call res if callback
              nil
            end
          when Array
            obj.each do |line|
              res = block.call(line)
              callback.call res if callback
              nil
            end
          when String
            obj = obj.produce_and_find if Path === obj
            f = Open.open(obj)
            self.traverse(f, cpus: cpus, callback: callback, **options, &block)
          when (defined?(Step) && Step)
            raise obj.exception if obj.error?
            self.traverse(obj.stream, cpus: cpus, callback: callback, **options, &block)
          when IO
            if options[:type] == :array || options[:type] == :line
              Log.low "Traverse stream by lines #{Log.fingerprint obj}"
              while line = obj.gets
                line.strip!
                res = block.call(line)
                callback.call res if callback
              end
            else
              Log.low "Traverse stream with parser #{Log.fingerprint obj}"
              parser = options[:sep].nil? ? TSV::Parser.new(obj) : TSV::Parser.new(obj, sep: options[:sep])
              parser.traverse **options do |k,v,f|
                res = block.call k,v,f
                callback.call res if callback
                nil
              end
            end
          when TSV::Parser
            obj.traverse **options do |k,v,f|
              res = block.call k, v, f
              callback.call res if callback
              nil
            end
          else
            if obj.respond_to?(:pop)
              while elem = obj.pop
                res = block.call elem
                callback.call res if callback
                break unless obj.any?
              end
            else
              TSV.parse obj, **options do |k,v|
                res = block.call k, v
                callback.call res if callback
                nil
              end
            end
          end
    bar.remove if bar
  rescue Exception => exception
    exception = obj.stream_exception if (ConcurrentStream === obj) && obj.stream_exception
    bar.error if bar
    raise exception
  end

  into || res
end

.traverse_add(into, res) ⇒ Object



12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/scout/tsv/open.rb', line 12

def self.traverse_add(into, res)
  if Array === res && MultipleResult === res
    res.each do |_res|
      traverse_add into, _res
    end
  else
    case into
    when defined?(TSV::Dumper) && TSV::Dumper
      into.add *res
    when TSV, Hash
      key, value = res
      if TSV === into && into.type == :double
        into.zip_new key, value, insitu: false
      else
        into[key] = value
      end
    when Array, Set
      into << res
    when IO, StringIO
      into.puts res
    end
  end
end