Class: A2A::Server::Events::InMemoryEventQueue
- Inherits:
-
EventQueue
- Object
- EventQueue
- A2A::Server::Events::InMemoryEventQueue
- Defined in:
- lib/a2a/server/events/event_queue.rb
Overview
In-memory event queue implementation
A simple in-memory event queue that uses Ruby's Queue class for thread-safe event publishing and consumption.
Instance Method Summary collapse
-
#close ⇒ Object
Close the event queue.
-
#closed? ⇒ Boolean
Check if the queue is closed.
-
#initialize ⇒ InMemoryEventQueue
constructor
A new instance of InMemoryEventQueue.
-
#publish(event) ⇒ Object
Publish an event to all subscribers.
-
#subscribe(filter = nil) ⇒ Enumerator
Subscribe to events with optional filtering.
-
#subscriber_count ⇒ Integer
Get the number of active subscribers.
Constructor Details
#initialize ⇒ InMemoryEventQueue
Returns a new instance of InMemoryEventQueue.
141 142 143 144 145 146 |
# File 'lib/a2a/server/events/event_queue.rb', line 141 def initialize @queue = Queue.new @subscribers = [] @closed = false @mutex = Mutex.new end |
Instance Method Details
#close ⇒ Object
Close the event queue
201 202 203 204 205 206 |
# File 'lib/a2a/server/events/event_queue.rb', line 201 def close @closed = true @mutex.synchronize do @subscribers.clear end end |
#closed? ⇒ Boolean
Check if the queue is closed
212 213 214 |
# File 'lib/a2a/server/events/event_queue.rb', line 212 def closed? @closed end |
#publish(event) ⇒ Object
Publish an event to all subscribers
152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/a2a/server/events/event_queue.rb', line 152 def publish(event) return if @closed @mutex.synchronize do @subscribers.each do |subscriber| subscriber[:queue].push(event) if subscriber[:filter].nil? || subscriber[:filter].call(event) rescue StandardError => e # Log error but don't fail the publish operation warn "Error publishing to subscriber: #{e.}" end end end |
#subscribe(filter = nil) ⇒ Enumerator
Subscribe to events with optional filtering
170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 |
# File 'lib/a2a/server/events/event_queue.rb', line 170 def subscribe(filter = nil) return enum_for(:subscribe, filter) unless block_given? subscriber_queue = Queue.new subscriber = { queue: subscriber_queue, filter: filter } @mutex.synchronize do @subscribers << subscriber end begin loop do break if @closed begin event = subscriber_queue.pop(true) # Non-blocking pop yield event rescue ThreadError # Queue is empty, sleep briefly and try again sleep 0.001 # Reduced sleep time for better responsiveness end end ensure @mutex.synchronize do @subscribers.delete(subscriber) end end end |
#subscriber_count ⇒ Integer
Get the number of active subscribers
220 221 222 |
# File 'lib/a2a/server/events/event_queue.rb', line 220 def subscriber_count @mutex.synchronize { @subscribers.length } end |