Class: WorkQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/workqueue.rb,
lib/workqueue/version.rb

Defined Under Namespace

Classes: ThreadsafeCounter

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(init_queue = [], opts = {}, &job) ⇒ WorkQueue

Returns a new instance of WorkQueue.



21
22
23
24
25
26
27
28
# File 'lib/workqueue.rb', line 21

def initialize(init_queue=[], opts={}, &job)
  @job = job
  @queue = Queue.new

  opts.each { |k, v| send(:"#{k}=", v) }

  concat(init_queue)
end

Instance Attribute Details

#jobObject (readonly)

Returns the value of attribute job.



20
21
22
# File 'lib/workqueue.rb', line 20

def job
  @job
end

#queueObject (readonly)

Returns the value of attribute queue.



19
20
21
# File 'lib/workqueue.rb', line 19

def queue
  @queue
end

#sizeObject



31
32
33
# File 'lib/workqueue.rb', line 31

def size
  @size ||= 2
end

Class Method Details

.versionObject



2
3
4
# File 'lib/workqueue/version.rb', line 2

def self.version
  '0.1.0'
end

Instance Method Details

#concat(arr) ⇒ Object



66
67
68
# File 'lib/workqueue.rb', line 66

def concat(arr)
  arr.each { |x| push(x) }
end

#joinObject



70
71
72
73
74
75
# File 'lib/workqueue.rb', line 70

def join
  @joined = true
  workers.each(&:join)

  self
end

#push(e) ⇒ Object Also known as: <<



59
60
61
62
63
# File 'lib/workqueue.rb', line 59

def push(e)
  @queue.push([e, cursor.incr])

  self
end

#resultsObject



77
78
79
80
# File 'lib/workqueue.rb', line 77

def results
  join
  aggregate
end

#runObject



51
52
53
54
55
56
57
# File 'lib/workqueue.rb', line 51

def run
  @workers = (1..size).map do
    Thread.new { work! }
  end

  self
end

#work!Object



39
40
41
42
43
44
45
46
47
48
49
# File 'lib/workqueue.rb', line 39

def work!
  until @aborted or (@joined and queue.empty?)
    begin
      payload, index = queue.shift
      aggregate[index] = job.call(payload)
    rescue Exception
      @aborted = true
      raise
    end
  end
end

#workersObject



35
36
37
# File 'lib/workqueue.rb', line 35

def workers
  @workers ||= []
end