Class: Zookeeper::Common::QueueWithPipe
- Inherits:
-
Object
- Object
- Zookeeper::Common::QueueWithPipe
show all
- Extended by:
- Forwardable
- Includes:
- Logger
- Defined in:
- lib/zookeeper/common/queue_with_pipe.rb
Overview
Ceci n’est pas une pipe
Defined Under Namespace
Classes: ShutdownException
Constant Summary
collapse
- KILL_TOKEN =
Object.new
Instance Method Summary
collapse
Methods included from Logger
included, wrapped_logger, wrapped_logger=
Constructor Details
Returns a new instance of QueueWithPipe.
15
16
17
18
19
20
21
22
|
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 15
def initialize
@array = []
@mutex = Mutex.new
@cond = ConditionVariable.new
@closed = false
@graceful = false
end
|
Instance Method Details
#clear ⇒ Object
24
25
26
27
28
29
30
31
|
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 24
def clear
@mutex.lock
begin
@array.clear
ensure
@mutex.unlock rescue nil
end
end
|
#close ⇒ Object
94
95
96
97
98
99
100
101
102
103
|
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 94
def close
@mutex.lock
begin
return if @closed
@closed = true
@cond.broadcast
ensure
@mutex.unlock rescue nil
end
end
|
#closed? ⇒ Boolean
105
106
107
|
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 105
def closed?
@mutex.synchronize { !!@closed }
end
|
#graceful_close! ⇒ Object
close the queue and causes ShutdownException to be raised on waiting threads
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 71
def graceful_close!
@mutex.lock
begin
return if @graceful or @closed
logger.debug { "#{self.class}##{__method__} gracefully closing" }
@graceful = true
@cond.broadcast
ensure
@mutex.unlock rescue nil
end
nil
end
|
#open ⇒ Object
84
85
86
87
88
89
90
91
92
|
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 84
def open
@mutex.lock
begin
@closed = @graceful = false
@cond.broadcast
ensure
@mutex.unlock rescue nil
end
end
|
#pop(non_blocking = false) ⇒ Object
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 44
def pop(non_blocking=false)
rval = nil
@mutex.lock
begin
begin
raise ShutdownException if @closed
rval = @array.shift
unless rval
raise ThreadError if non_blocking raise ShutdownException if @graceful
@cond.wait(@mutex) until (@closed or @graceful or (@array.length > 0))
end
end until rval
return rval
ensure
@mutex.unlock rescue nil
end
end
|
#push(obj) ⇒ Object
33
34
35
36
37
38
39
40
41
42
|
# File 'lib/zookeeper/common/queue_with_pipe.rb', line 33
def push(obj)
@mutex.lock
begin
@array << obj
@cond.signal
ensure
@mutex.unlock rescue nil
end
end
|