Class: A2A::Server::Events::EventConsumer

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/server/events/event_consumer.rb

Overview

Event consumer for processing events from an event queue

Provides functionality to consume events from an EventQueue and process them with registered handlers. Supports filtering and error handling.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(queue) ⇒ EventConsumer

Initialize a new event consumer

Parameters:

  • queue (EventQueue)

    The event queue to consume from



21
22
23
24
25
26
27
# File 'lib/a2a/server/events/event_consumer.rb', line 21

def initialize(queue)
  @queue = queue
  @handlers = {}
  @running = false
  @thread = nil
  @mutex = Mutex.new
end

Instance Attribute Details

#handlersObject (readonly)

Returns the value of attribute handlers.



15
16
17
# File 'lib/a2a/server/events/event_consumer.rb', line 15

def handlers
  @handlers
end

#queueObject (readonly)

Returns the value of attribute queue.



15
16
17
# File 'lib/a2a/server/events/event_consumer.rb', line 15

def queue
  @queue
end

#runningObject (readonly)

Returns the value of attribute running.



15
16
17
# File 'lib/a2a/server/events/event_consumer.rb', line 15

def running
  @running
end

Instance Method Details

#consume_events(filter) ⇒ Object (private)

Consume events from the queue

Parameters:

  • filter (Proc, nil)

    Optional event filter



94
95
96
97
98
99
100
101
102
# File 'lib/a2a/server/events/event_consumer.rb', line 94

def consume_events(filter)
  @queue.subscribe(filter) do |event|
    break unless @running

    process_event(event)
  end
rescue StandardError => e
  handle_error(nil, e)
end

#handle_error(event, error) ⇒ Object (private)

Handle errors during event processing

Parameters:

  • event (Event, nil)

    The event being processed (if any)

  • error (StandardError)

    The error that occurred



109
110
111
112
# File 'lib/a2a/server/events/event_consumer.rb', line 109

def handle_error(event, error)
  warn "Error processing event #{event&.id}: #{error.message}"
  warn error.backtrace.join("\n") if error.backtrace
end

#process_event(event) ⇒ Object

Process a single event synchronously

Parameters:

  • event (Event)

    The event to process



78
79
80
81
82
83
84
85
86
# File 'lib/a2a/server/events/event_consumer.rb', line 78

def process_event(event)
  handlers = @mutex.synchronize { @handlers[event.type]&.dup || [] }

  handlers.each do |handler|
    handler.call(event)
  rescue StandardError => e
    handle_error(event, e)
  end
end

#register_handler(event_type, &handler) ⇒ Object

Register a handler for a specific event type

Parameters:

  • event_type (String)

    The event type to handle

  • handler (Proc)

    The handler proc that receives the event



34
35
36
37
38
39
# File 'lib/a2a/server/events/event_consumer.rb', line 34

def register_handler(event_type, &handler)
  @mutex.synchronize do
    @handlers[event_type] ||= []
    @handlers[event_type] << handler
  end
end

#remove_handler(event_type, handler) ⇒ Object

Remove a handler for a specific event type

Parameters:

  • event_type (String)

    The event type

  • handler (Proc)

    The handler to remove



46
47
48
49
50
51
# File 'lib/a2a/server/events/event_consumer.rb', line 46

def remove_handler(event_type, handler)
  @mutex.synchronize do
    @handlers[event_type]&.delete(handler)
    @handlers.delete(event_type) if @handlers[event_type] && @handlers[event_type].empty?
  end
end

#start(filter = nil) ⇒ Object

Start consuming events in a background thread

Parameters:

  • filter (Proc, nil) (defaults to: nil)

    Optional filter for events



57
58
59
60
61
62
63
64
# File 'lib/a2a/server/events/event_consumer.rb', line 57

def start(filter = nil)
  return if @running

  @running = true
  @thread = Thread.new do
    consume_events(filter)
  end
end

#stopObject

Stop consuming events



68
69
70
71
72
# File 'lib/a2a/server/events/event_consumer.rb', line 68

def stop
  @running = false
  @thread&.join
  @thread = nil
end