Class: ActiveMatrix::EventRouter

Inherits:
Object
  • Object
show all
Includes:
Logging, Singleton
Defined in:
lib/active_matrix/event_router.rb

Overview

Routes Matrix events to appropriate agents using async fibers

Instance Method Summary collapse

Methods included from Logging

included, #logger, #logger=

Constructor Details

#initializeEventRouter

Returns a new instance of EventRouter.



13
14
15
16
17
18
19
# File 'lib/active_matrix/event_router.rb', line 13

def initialize
  @routes = []
  @mutex = Mutex.new
  @event_queue = nil
  @processing = false
  @worker_task = nil
end

Instance Method Details

#broadcast_event(event) ⇒ Object

Broadcast an event to all agents



101
102
103
104
105
106
107
# File 'lib/active_matrix/event_router.rb', line 101

def broadcast_event(event)
  AgentRegistry.instance.all_instances.each do |bot|
    bot._handle_event(event) if bot.respond_to?(:_handle_event)
  rescue StandardError => e
    logger.error "Error broadcasting to bot: #{e.message}"
  end
end

#clear_agent_routes(agent_id) ⇒ Object

Clear all routes for an agent



50
51
52
53
54
# File 'lib/active_matrix/event_router.rb', line 50

def clear_agent_routes(agent_id)
  @mutex.synchronize do
    @routes.delete_if { |route| route[:agent_id] == agent_id }
  end
end

#register_route(agent_id:, room_id: nil, event_type: nil, user_id: nil, priority: 50, &block) ⇒ Object

Register an event route



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/active_matrix/event_router.rb', line 22

def register_route(agent_id:, room_id: nil, event_type: nil, user_id: nil, priority: 50, &block)
  route = {
    id: SecureRandom.uuid,
    agent_id: agent_id,
    room_id: room_id,
    event_type: event_type,
    user_id: user_id,
    priority: priority,
    handler: block
  }

  @mutex.synchronize do
    @routes << route
    @routes.sort_by! { |r| -r[:priority] } # Higher priority first
  end

  logger.debug "Registered route: #{route.except(:handler).inspect}"
  route[:id]
end

#route_event(event) ⇒ Object

Route an event to appropriate agents



57
58
59
60
61
62
# File 'lib/active_matrix/event_router.rb', line 57

def route_event(event)
  return unless @processing && @event_queue

  # Queue the event for processing
  @event_queue.enqueue(event)
end

#routes_summaryObject

Get routes for debugging



94
95
96
97
98
# File 'lib/active_matrix/event_router.rb', line 94

def routes_summary
  @mutex.synchronize do
    @routes.map { |r| r.except(:handler) }
  end
end

#running?Boolean

Check if router is running

Returns:

  • (Boolean)


89
90
91
# File 'lib/active_matrix/event_router.rb', line 89

def running?
  @processing && @worker_task&.alive?
end

#startObject

Start the event router (call from within async context)



65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/active_matrix/event_router.rb', line 65

def start
  return if @processing

  @processing = true
  @event_queue = Async::Queue.new

  @worker_task = Async(transient: true) do |task|
    task.annotate 'event-router'
    process_events
  end

  logger.info 'Event router started'
end

#stopObject

Stop the event router



80
81
82
83
84
85
86
# File 'lib/active_matrix/event_router.rb', line 80

def stop
  @processing = false
  @worker_task&.stop
  @event_queue = nil

  logger.info 'Event router stopped'
end

#unregister_route(route_id) ⇒ Object

Unregister a route



43
44
45
46
47
# File 'lib/active_matrix/event_router.rb', line 43

def unregister_route(route_id)
  @mutex.synchronize do
    @routes.delete_if { |route| route[:id] == route_id }
  end
end