Class: Arachni::Reactor::Iterator

Inherits:
Object
  • Object
show all
Defined in:
lib/arachni/reactor/iterator.rb

Overview

Note:

Pretty much an EventMachine::Iterator rip-off.

A simple iterator for concurrent asynchronous work.

Unlike Ruby's built-in iterators, the end of the current iteration cycle is signaled manually, instead of happening automatically after the yielded block finishes executing.

Examples:

Direct initialization.


Iterator.new( reactor, 0..10 ).each { |num, iterator| iterator.next }

Reactor factory.


reactor.create_iterator( 0..10 ).each { |num, iterator| iterator.next }

Author:

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(reactor, list, concurrency = 1) ⇒ Iterator

Returns a new instance of Iterator.

Examples:

Create a new parallel async iterator with specified concurrency.


i = Iterator.new( reactor, 1..100, 10 )

Parameters:

  • reactor (Reactor)
  • list (#to_a)

    List to iterate.

  • concurrency (Integer) (defaults to: 1)

    Parallel workers to spawn.

Raises:

  • (ArgumentError)


47
48
49
50
51
52
53
54
55
56
57
# File 'lib/arachni/reactor/iterator.rb', line 47

def initialize( reactor, list, concurrency = 1 )
    raise ArgumentError, 'argument must be an array' unless list.respond_to?(:to_a)
    raise ArgumentError, 'concurrency must be bigger than zero' unless concurrency > 0

    @reactor     = reactor
    @list        = list.to_a.dup
    @concurrency = concurrency

    @started = false
    @ended   = false
end

Instance Attribute Details

#concurrencyInteger

Returns:

  • (Integer)


36
37
38
# File 'lib/arachni/reactor/iterator.rb', line 36

def concurrency
  @concurrency
end

#reactorReactor (readonly)

Returns:



33
34
35
# File 'lib/arachni/reactor/iterator.rb', line 33

def reactor
  @reactor
end

Instance Method Details

#each(foreach = nil, after = nil, &block) ⇒ Object

Examples:

Iterate over a set of items using the specified block or proc.


Iterator.new( reactor, 1..100 ).each do |num, iterator|
    puts num
    iterator.next
end

An optional second proc is invoked after the iteration is complete.


Iterator.new( reactor, 1..100 ).each(
    proc { |num, iterator| iterator.next },
    proc { puts 'all done' }
)

Raises:

  • (ArgumentError)


86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/arachni/reactor/iterator.rb', line 86

def each( foreach = nil, after = nil, &block )
    raise ArgumentError, 'Proc or Block required for iteration.' unless foreach ||= block
    raise RuntimeError, 'Cannot iterate over an iterator more than once.' if @started or @ended

    @started = true
    @pending = 0
    @workers = 0

    all_done = proc do
        after.call if after && @ended && @pending == 0
    end

    @process_next = proc do
        if @ended || @workers > @concurrency
            @workers -= 1
        else
            if @list.empty?
                @ended    = true
                @workers -= 1

                all_done.call
            else
                item      = @list.shift
                @pending += 1

                is_done = false
                on_done = proc do
                    raise RuntimeError, 'Already completed this iteration.' if is_done
                    is_done = true

                    @pending -= 1

                    if @ended
                        all_done.call
                    else
                        @reactor.next_tick(&@process_next)
                    end
                end

                class << on_done
                    alias :next :call
                end

                foreach.call(item, on_done)
            end
        end
    end

    spawn_workers

    self
end

#inject(object, foreach, after) ⇒ Object

Examples:

Inject the results of an asynchronous iteration onto a given object.


Iterator.new( reactor, %w(one two three four), 2 ).inject( {},
    proc do |hash, string, iterator|
        hash.merge!( string => string.size )
        iterator.return( hash )
    end,
    proc do |results|
        p results
    end
)

Parameters:

  • object (Object)
  • foreach (Proc)

    Proc to handle each entry.

  • after (Proc)

    Proc to handle the results.



204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/arachni/reactor/iterator.rb', line 204

def inject( object, foreach, after )
    each(
        proc do |item, iter|
            is_done = false
            on_done = proc do |res|
                raise RuntimeError, 'Already returned a value for this iteration.' if is_done
                is_done = true

                object = res
                iter.next
            end

            class << on_done
                alias :return :call
                def next
                    raise NoMethodError, 'Must call #return on an inject iterator.'
                end
            end

            foreach.call( object, item, on_done )
        end,

        proc do
            after.call(object)
        end
    )
end

#map(foreach, after) ⇒ Object

Examples:

Collect the results of an asynchronous iteration into an array.


Iterator.new( reactor, %w(one two three four), 2 ).map(
    proc do |string, iterator|
        iterator.return( string.size )
    end,
    proc do |results|
        p results
    end
)

Parameters:

  • foreach (Proc)

    Proc to handle each entry.

  • after (Proc)

    Proc to handle the results.



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/arachni/reactor/iterator.rb', line 154

def map( foreach, after )
    index = 0

    inject( [],
        proc do |results, item, iter|
            i      = index
            index += 1

            is_done = false
            on_done = proc do |res|
                raise RuntimeError, 'Already returned a value for this iteration.' if is_done
                is_done = true

                results[i] = res
                iter.return(results)
            end

            class << on_done
                alias :return :call
                def next
                    raise NoMethodError, 'Must call #return on a map iterator.'
                end
            end

            foreach.call( item, on_done )
        end,

        proc do |results|
            after.call(results)
        end
    )
end