Class: ActiveMatrix::EventRouter
- Inherits:
-
Object
- Object
- ActiveMatrix::EventRouter
- Includes:
- Logging, Singleton
- Defined in:
- lib/active_matrix/event_router.rb
Overview
Routes Matrix events to appropriate agents using async fibers
Instance Method Summary collapse
-
#broadcast_event(event) ⇒ Object
Broadcast an event to all agents.
-
#clear_agent_routes(agent_id) ⇒ Object
Clear all routes for an agent.
-
#initialize ⇒ EventRouter
constructor
A new instance of EventRouter.
-
#register_route(agent_id:, room_id: nil, event_type: nil, user_id: nil, priority: 50, &block) ⇒ Object
Register an event route.
-
#route_event(event) ⇒ Object
Route an event to appropriate agents.
-
#routes_summary ⇒ Object
Get routes for debugging.
-
#running? ⇒ Boolean
Check if router is running.
-
#start ⇒ Object
Start the event router (call from within async context).
-
#stop ⇒ Object
Stop the event router.
-
#unregister_route(route_id) ⇒ Object
Unregister a route.
Methods included from Logging
Constructor Details
#initialize ⇒ EventRouter
Returns a new instance of EventRouter.
13 14 15 16 17 18 19 |
# File 'lib/active_matrix/event_router.rb', line 13 def initialize @routes = [] @mutex = Mutex.new @event_queue = nil @processing = false @worker_task = nil end |
Instance Method Details
#broadcast_event(event) ⇒ Object
Broadcast an event to all agents
101 102 103 104 105 106 107 |
# File 'lib/active_matrix/event_router.rb', line 101 def broadcast_event(event) AgentRegistry.instance.all_instances.each do |bot| bot._handle_event(event) if bot.respond_to?(:_handle_event) rescue StandardError => e logger.error "Error broadcasting to bot: #{e.}" end end |
#clear_agent_routes(agent_id) ⇒ Object
Clear all routes for an agent
50 51 52 53 54 |
# File 'lib/active_matrix/event_router.rb', line 50 def clear_agent_routes(agent_id) @mutex.synchronize do @routes.delete_if { |route| route[:agent_id] == agent_id } end end |
#register_route(agent_id:, room_id: nil, event_type: nil, user_id: nil, priority: 50, &block) ⇒ Object
Register an event route
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
# File 'lib/active_matrix/event_router.rb', line 22 def register_route(agent_id:, room_id: nil, event_type: nil, user_id: nil, priority: 50, &block) route = { id: SecureRandom.uuid, agent_id: agent_id, room_id: room_id, event_type: event_type, user_id: user_id, priority: priority, handler: block } @mutex.synchronize do @routes << route @routes.sort_by! { |r| -r[:priority] } # Higher priority first end logger.debug "Registered route: #{route.except(:handler).inspect}" route[:id] end |
#route_event(event) ⇒ Object
Route an event to appropriate agents
57 58 59 60 61 62 |
# File 'lib/active_matrix/event_router.rb', line 57 def route_event(event) return unless @processing && @event_queue # Queue the event for processing @event_queue.enqueue(event) end |
#routes_summary ⇒ Object
Get routes for debugging
94 95 96 97 98 |
# File 'lib/active_matrix/event_router.rb', line 94 def routes_summary @mutex.synchronize do @routes.map { |r| r.except(:handler) } end end |
#running? ⇒ Boolean
Check if router is running
89 90 91 |
# File 'lib/active_matrix/event_router.rb', line 89 def running? @processing && @worker_task&.alive? end |
#start ⇒ Object
Start the event router (call from within async context)
65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/active_matrix/event_router.rb', line 65 def start return if @processing @processing = true @event_queue = Async::Queue.new @worker_task = Async(transient: true) do |task| task.annotate 'event-router' process_events end logger.info 'Event router started' end |
#stop ⇒ Object
Stop the event router
80 81 82 83 84 85 86 |
# File 'lib/active_matrix/event_router.rb', line 80 def stop @processing = false @worker_task&.stop @event_queue = nil logger.info 'Event router stopped' end |
#unregister_route(route_id) ⇒ Object
Unregister a route
43 44 45 46 47 |
# File 'lib/active_matrix/event_router.rb', line 43 def unregister_route(route_id) @mutex.synchronize do @routes.delete_if { |route| route[:id] == route_id } end end |