Class: GirlFriday::WorkQueue
- Inherits:
-
Object
- Object
- GirlFriday::WorkQueue
show all
- Defined in:
- lib/girl_friday/work_queue.rb
Defined Under Namespace
Classes: Ready, Shutdown, Work
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(name, options = {}, &block) ⇒ WorkQueue
Returns a new instance of WorkQueue.
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
# File 'lib/girl_friday/work_queue.rb', line 9
def initialize(name, options={}, &block)
raise ArgumentError, "#{self.class.name} requires a block" unless block_given?
@name = name.to_s
@size = options[:size] || 5
@processor = block
@error_handlers = (Array(options[:error_handler] || ErrorHandler.default)).map(&:new)
@shutdown = false
@busy_workers = []
@ready_workers = nil
@created_at = Time.now.to_i
@total_processed = @total_errors = @total_queued = 0
@persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || {}))
@weakref = WeakRef.new(self)
start
GirlFriday.add_queue @weakref
end
|
Instance Attribute Details
#name ⇒ Object
Returns the value of attribute name.
8
9
10
|
# File 'lib/girl_friday/work_queue.rb', line 8
def name
@name
end
|
Class Method Details
27
28
29
30
|
# File 'lib/girl_friday/work_queue.rb', line 27
def self.immediate!
alias_method :push, :push_immediately
alias_method :<<, :push_immediately
end
|
.queue! ⇒ Object
32
33
34
35
|
# File 'lib/girl_friday/work_queue.rb', line 32
def self.queue!
alias_method :push, :push_async
alias_method :<<, :push_async
end
|
Instance Method Details
#push_async(work, &block) ⇒ Object
Also known as:
push, <<
43
44
45
|
# File 'lib/girl_friday/work_queue.rb', line 43
def push_async(work, &block)
@supervisor << Work[work, block]
end
|
37
38
39
40
41
|
# File 'lib/girl_friday/work_queue.rb', line 37
def push_immediately(work, &block)
result = @processor.call(work)
return yield result if block
result
end
|
#shutdown(&block) ⇒ Object
73
74
75
76
77
|
# File 'lib/girl_friday/work_queue.rb', line 73
def shutdown(&block)
@supervisor << Shutdown[block]
end
|
#status ⇒ Object
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
# File 'lib/girl_friday/work_queue.rb', line 49
def status
{ @name => {
:pid => $$,
:pool_size => @size,
:ready => @ready_workers ? @ready_workers.size : 0,
:busy => @busy_workers.size,
:backlog => @persister.size,
:total_queued => @total_queued,
:total_processed => @total_processed,
:total_errors => @total_errors,
:uptime => Time.now.to_i - @created_at,
:created_at => @created_at,
}
}
end
|
#wait_for_empty ⇒ Object
Busy wait for the queue to empty. Useful for testing.
67
68
69
70
71
|
# File 'lib/girl_friday/work_queue.rb', line 67
def wait_for_empty
until @persister.size == 0
sleep 0.1
end
end
|