Class: A2A::Server::Events::InMemoryEventQueue

Inherits:
EventQueue
  • Object
show all
Defined in:
lib/a2a/server/events/event_queue.rb

Overview

In-memory event queue implementation

A simple in-memory event queue that uses Ruby's Queue class for thread-safe event publishing and consumption.

Instance Method Summary collapse

Constructor Details

#initializeInMemoryEventQueue

Returns a new instance of InMemoryEventQueue.



141
142
143
144
145
146
# File 'lib/a2a/server/events/event_queue.rb', line 141

def initialize
  @queue = Queue.new
  @subscribers = []
  @closed = false
  @mutex = Mutex.new
end

Instance Method Details

#closeObject

Close the event queue



201
202
203
204
205
206
# File 'lib/a2a/server/events/event_queue.rb', line 201

def close
  @closed = true
  @mutex.synchronize do
    @subscribers.clear
  end
end

#closed?Boolean

Check if the queue is closed

Returns:

  • (Boolean)

    True if closed



212
213
214
# File 'lib/a2a/server/events/event_queue.rb', line 212

def closed?
  @closed
end

#publish(event) ⇒ Object

Publish an event to all subscribers

Parameters:

  • event (Event)

    The event to publish



152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/a2a/server/events/event_queue.rb', line 152

def publish(event)
  return if @closed

  @mutex.synchronize do
    @subscribers.each do |subscriber|
      subscriber[:queue].push(event) if subscriber[:filter].nil? || subscriber[:filter].call(event)
    rescue StandardError => e
      # Log error but don't fail the publish operation
      warn "Error publishing to subscriber: #{e.message}"
    end
  end
end

#subscribe(filter = nil) ⇒ Enumerator

Subscribe to events with optional filtering

Parameters:

  • filter (Proc, nil) (defaults to: nil)

    Optional filter proc that receives an event and returns boolean

Returns:

  • (Enumerator)

    Enumerator that yields events



170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# File 'lib/a2a/server/events/event_queue.rb', line 170

def subscribe(filter = nil)
  return enum_for(:subscribe, filter) unless block_given?

  subscriber_queue = Queue.new
  subscriber = { queue: subscriber_queue, filter: filter }

  @mutex.synchronize do
    @subscribers << subscriber
  end

  begin
    loop do
      break if @closed

      begin
        event = subscriber_queue.pop(true) # Non-blocking pop
        yield event
      rescue ThreadError
        # Queue is empty, sleep briefly and try again
        sleep 0.001 # Reduced sleep time for better responsiveness
      end
    end
  ensure
    @mutex.synchronize do
      @subscribers.delete(subscriber)
    end
  end
end

#subscriber_countInteger

Get the number of active subscribers

Returns:

  • (Integer)

    Number of subscribers



220
221
222
# File 'lib/a2a/server/events/event_queue.rb', line 220

def subscriber_count
  @mutex.synchronize { @subscribers.length }
end