Class: Agent::Queue
- Inherits:
-
Object
show all
- Extended by:
- Forwardable
- Defined in:
- lib/agent/queue.rb,
lib/agent/queue/buffered.rb,
lib/agent/queue/unbuffered.rb
Defined Under Namespace
Classes: Buffered, Unbuffered
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(type) ⇒ Queue
Returns a new instance of Queue.
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
# File 'lib/agent/queue.rb', line 19
def initialize(type)
@type = type
raise Errors::Untyped unless @type
raise Errors::InvalidType unless @type.is_a?(Module)
@closed = false
@queue = []
@operations = []
@pushes = []
@pops = []
@mutex = Mutex.new
reset_custom_state
end
|
Instance Attribute Details
#operations ⇒ Object
Returns the value of attribute operations.
10
11
12
|
# File 'lib/agent/queue.rb', line 10
def operations
@operations
end
|
#type ⇒ Object
Returns the value of attribute type.
10
11
12
|
# File 'lib/agent/queue.rb', line 10
def type
@type
end
|
Instance Method Details
#close ⇒ Object
57
58
59
60
61
62
63
64
65
|
# File 'lib/agent/queue.rb', line 57
def close
mutex.synchronize do
raise Errors::ChannelClosed if @closed
@operations.each{|o| o.close }
@closed = true
reset_custom_state
end
end
|
#closed? ⇒ Boolean
67
|
# File 'lib/agent/queue.rb', line 67
def closed?; @closed; end
|
#open? ⇒ Boolean
68
|
# File 'lib/agent/queue.rb', line 68
def open?; !@closed; end
|
#pop(options = {}) ⇒ Object
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/agent/queue.rb', line 87
def pop(options={})
pop = Pop.new(options)
mutex.synchronize do
pop.close if @closed and queue.size == 0
operations << pop
pops << pop
process
end
return pop if options[:deferred]
ok = pop.wait
[pop.object, ok]
end
|
#push(object, options = {}) ⇒ Object
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/agent/queue.rb', line 70
def push(object, options={})
raise Errors::InvalidType unless object.is_a?(@type)
push = Push.new(object, options)
mutex.synchronize do
raise Errors::ChannelClosed if @closed
operations << push
pushes << push
process
end
return push if options[:deferred]
push.wait
end
|
#remove_operations(ops) ⇒ Object
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# File 'lib/agent/queue.rb', line 103
def remove_operations(ops)
mutex.synchronize do
return if @closed
ops.each do |operation|
operations.delete(operation)
end
pushes.clear
pops.clear
operations.each do |operation|
if operation.is_a?(Push)
pushes << operation
else
pops << operation
end
end
reset_custom_state
end
end
|