Class: GirlFriday::WorkQueue
- Inherits:
-
Object
- Object
- GirlFriday::WorkQueue
show all
- Defined in:
- lib/girl_friday/work_queue.rb
Defined Under Namespace
Classes: Ready, Shutdown, Work
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(name, options = {}, &block) ⇒ WorkQueue
Returns a new instance of WorkQueue.
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'lib/girl_friday/work_queue.rb', line 9
def initialize(name, options={}, &block)
raise ArgumentError, "#{self.class.name} requires a block" unless block_given?
@name = name.to_s
@size = options[:size] || 5
@processor = block
@error_handlers = (Array(options[:error_handler] || ErrorHandler.default)).map(&:new)
@shutdown = false
@shutting_down = false
@busy_workers = []
@ready_workers = nil
@created_at = Time.now.to_i
@total_processed = @total_errors = @total_queued = 0
@persister = (options[:store] || Store::InMemory).new(name, (options[:store_config] || {}))
@weakref = WeakRef.new(self)
start
GirlFriday.add_queue @weakref
end
|
Instance Attribute Details
#name ⇒ Object
Returns the value of attribute name.
8
9
10
|
# File 'lib/girl_friday/work_queue.rb', line 8
def name
@name
end
|
Class Method Details
28
29
30
31
|
# File 'lib/girl_friday/work_queue.rb', line 28
def self.immediate!
alias_method :push, :push_immediately
alias_method :<<, :push_immediately
end
|
.queue! ⇒ Object
33
34
35
36
|
# File 'lib/girl_friday/work_queue.rb', line 33
def self.queue!
alias_method :push, :push_async
alias_method :<<, :push_async
end
|
Instance Method Details
#push_async(work, &block) ⇒ Object
Also known as:
push, <<
44
45
46
|
# File 'lib/girl_friday/work_queue.rb', line 44
def push_async(work, &block)
@supervisor << Work[work, block]
end
|
38
39
40
41
42
|
# File 'lib/girl_friday/work_queue.rb', line 38
def push_immediately(work, &block)
result = @processor.call(work)
return yield result if block
result
end
|
#shutdown(&block) ⇒ Object
74
75
76
77
78
|
# File 'lib/girl_friday/work_queue.rb', line 74
def shutdown(&block)
@supervisor << Shutdown[block]
end
|
#status ⇒ Object
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
# File 'lib/girl_friday/work_queue.rb', line 50
def status
{ @name => {
:pid => $$,
:pool_size => @size,
:ready => @ready_workers ? @ready_workers.size : 0,
:busy => @busy_workers.size,
:backlog => @persister.size,
:total_queued => @total_queued,
:total_processed => @total_processed,
:total_errors => @total_errors,
:uptime => Time.now.to_i - @created_at,
:created_at => @created_at,
}
}
end
|
#wait_for_empty ⇒ Object
Busy wait for the queue to empty. Useful for testing.
68
69
70
71
72
|
# File 'lib/girl_friday/work_queue.rb', line 68
def wait_for_empty
until @persister.size == 0
sleep 0.1
end
end
|
#working? ⇒ Boolean
80
81
82
|
# File 'lib/girl_friday/work_queue.rb', line 80
def working?
@busy_workers.size > 0
end
|