Class: Async::PriorityQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/async/priority_queue.rb

Overview

A queue which allows items to be processed in priority order of consumers.

Unlike a traditional priority queue where items have priorities, this queue assigns priorities to consumers (fibers waiting to dequeue). Higher priority consumers are served first when items become available.

Constant Summary collapse

ClosedError =
Queue::ClosedError

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parent: nil) ⇒ PriorityQueue

Create a new priority queue.



63
64
65
66
67
68
69
70
71
# File 'lib/async/priority_queue.rb', line 63

def initialize(parent: nil)
	@items = []
	@closed = false
	@parent = parent
	@waiting = IO::Event::PriorityHeap.new
	@sequence = 0
	
	@mutex = Mutex.new
end

Instance Attribute Details

#itemsObject (readonly)

Returns the value of attribute items.



92
93
94
# File 'lib/async/priority_queue.rb', line 92

def items
  @items
end

#The items in the queue.(items) ⇒ Object (readonly)



92
# File 'lib/async/priority_queue.rb', line 92

attr :items

Instance Method Details

#<<(item) ⇒ Object

Compatibility with Queue#push.



138
139
140
# File 'lib/async/priority_queue.rb', line 138

def <<(item)
	self.push(item)
end

#async(priority: 0, parent: (@parent or Task.current), **options, &block) ⇒ Object

Process each item in the queue.



226
227
228
229
230
# File 'lib/async/priority_queue.rb', line 226

def async(priority: 0, parent: (@parent or Task.current), **options, &block)
	while item = self.dequeue(priority: priority)
		parent.async(item, **options, &block)
	end
end

#closeObject

Close the queue, causing all waiting tasks to return ‘nil`. Any subsequent calls to #enqueue will raise an exception.



75
76
77
78
79
80
81
82
83
84
# File 'lib/async/priority_queue.rb', line 75

def close
	@mutex.synchronize do
		@closed = true
		
		# Signal all waiting fibers with nil, skipping dead/invalid ones:
		while waiter = @waiting.pop
			waiter.signal(nil)
		end
	end
end

#closed?Boolean

Returns:

  • (Boolean)


87
88
89
# File 'lib/async/priority_queue.rb', line 87

def closed?
	@closed
end

#dequeue(priority: 0, timeout: nil) ⇒ Object

Remove and return the next item from the queue.

If the queue is empty, this method will block until an item is available or timeout expires. Fibers are served in priority order, with higher priority fibers receiving items first.



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# File 'lib/async/priority_queue.rb', line 173

def dequeue(priority: 0, timeout: nil)
	@mutex.synchronize do
		# If queue is closed and empty, return nil immediately:
		if @closed && @items.empty?
			return nil
		end
		
		# Fast path: if items available and either no waiters or we have higher priority:
		unless @items.empty?
			head = @waiting.peek
			if head.nil? or priority > head.priority
				return @items.shift
			end
		end
		
		# Handle immediate timeout (non-blocking)
		return nil if timeout == 0
		
		# Need to wait - create our own condition variable and add to waiting queue:
		sequence = @sequence
		@sequence += 1
		
		condition = ConditionVariable.new
		
		begin
			waiter = Waiter.new(Fiber.current, priority, sequence, condition, nil)
			@waiting.push(waiter)
			
			# Wait for our specific condition variable to be signaled:
			return waiter.wait_for_value(@mutex, timeout)
		ensure
			waiter&.invalidate!
		end
	end
end

#each(priority: 0) ⇒ Object

Enumerate each item in the queue.



235
236
237
238
239
# File 'lib/async/priority_queue.rb', line 235

def each(priority: 0)
	while item = self.dequeue(priority: priority)
		yield item
	end
end

#empty?Boolean

Returns:

  • (Boolean)


100
101
102
# File 'lib/async/priority_queue.rb', line 100

def empty?
	@items.empty?
end

#enqueue(*items) ⇒ Object

Add multiple items to the queue.



145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/async/priority_queue.rb', line 145

def enqueue(*items)
	@mutex.synchronize do
		if @closed
			raise ClosedError, "Cannot enqueue items to a closed queue."
		end
		
		@items.concat(items)
		
		# Wake up waiting fibers in priority order, skipping dead/invalid waiters:
		while !@items.empty? && (waiter = @waiting.pop)
			if waiter.valid?
				value = @items.shift
				waiter.signal(value)
			end
			# Dead/invalid waiter discarded, continue to next one.
		end
	end
end

#pop(priority: 0, timeout: nil) ⇒ Object

Compatibility with Queue#pop.



214
215
216
# File 'lib/async/priority_queue.rb', line 214

def pop(priority: 0, timeout: nil)
	self.dequeue(priority: priority, timeout: timeout)
end

#push(item) ⇒ Object

Add an item to the queue.



117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# File 'lib/async/priority_queue.rb', line 117

def push(item)
	@mutex.synchronize do
		if @closed
			raise ClosedError, "Cannot push items to a closed queue."
		end
		
		@items << item
		
		# Wake up the highest priority waiter if any, skipping dead/invalid waiters:
		while waiter = @waiting.pop
			if waiter.valid?
				value = @items.shift
				waiter.signal(value)
				break
			end
			# Dead/invalid waiter discarded, try next one.
		end
	end
end

#signal(value = nil) ⇒ Object

Signal the queue with a value, the same as #enqueue.



242
243
244
# File 'lib/async/priority_queue.rb', line 242

def signal(value = nil)
	self.enqueue(value)
end

#sizeObject



95
96
97
# File 'lib/async/priority_queue.rb', line 95

def size
	@items.size
end

#wait(priority: 0) ⇒ Object

Wait for an item to be available, the same as #dequeue.



249
250
251
# File 'lib/async/priority_queue.rb', line 249

def wait(priority: 0)
	self.dequeue(priority: priority)
end

#waiting_countObject Also known as: waiting



105
106
107
108
109
# File 'lib/async/priority_queue.rb', line 105

def waiting_count
	@mutex.synchronize do
		@waiting.size
	end
end