Class: A2A::Server::PushNotificationManager
- Inherits:
-
Object
- Object
- A2A::Server::PushNotificationManager
- Defined in:
- lib/a2a/server/push_notification_manager.rb
Instance Attribute Summary collapse
-
#config ⇒ Object
readonly
Returns the value of attribute config.
-
#storage ⇒ Object
readonly
Returns the value of attribute storage.
Instance Method Summary collapse
-
#calculate_next_retry_time(attempt) ⇒ Time
private
Calculate next retry time using exponential backoff.
-
#default_config ⇒ Hash
private
Default configuration.
-
#delete_push_notification_config(*args) ⇒ Boolean
Delete a push notification config.
-
#get_push_notification_config(task_id, config_id: nil) ⇒ A2A::Types::TaskPushNotificationConfig?
Get push notification config for a task.
-
#initialize(storage: nil, config: {}) ⇒ PushNotificationManager
constructor
Initialize a new PushNotificationManager.
-
#list_push_notification_configs(*args) ⇒ Array<A2A::Types::TaskPushNotificationConfig>
List all push notification configs for a task.
-
#notify_task_artifact_update(event) ⇒ void
Send a task artifact update notification.
-
#notify_task_status_update(event) ⇒ void
Send a task status update notification.
-
#process_retry_queue ⇒ void
Process retry queue.
-
#register_sse_client(task_id, client) ⇒ String
Register an SSE client for task updates.
-
#send_sse_notification(task_id, event_type, event_data) ⇒ void
private
Send SSE notification to registered clients.
-
#send_webhook_notification(config, event_type, event_data) ⇒ void
private
Send a webhook notification.
-
#send_webhook_notification_internal(config, event_type, event_data, attempt) ⇒ Boolean
private
Internal webhook notification sending.
-
#set_push_notification_config(task_id, config) ⇒ A2A::Types::TaskPushNotificationConfig
Set a push notification config for a task.
-
#unregister_sse_client(task_id, client_id) ⇒ void
Unregister an SSE client.
Constructor Details
#initialize(storage: nil, config: {}) ⇒ PushNotificationManager
Initialize a new PushNotificationManager
24 25 26 27 28 29 30 |
# File 'lib/a2a/server/push_notification_manager.rb', line 24 def initialize(storage: nil, config: {}) @storage = storage || A2A::Server::Storage::Memory.new @config = default_config.merge(config) @sse_clients = {} @retry_queue = [] @mutex = Mutex.new end |
Instance Attribute Details
#config ⇒ Object (readonly)
Returns the value of attribute config.
17 18 19 |
# File 'lib/a2a/server/push_notification_manager.rb', line 17 def config @config end |
#storage ⇒ Object (readonly)
Returns the value of attribute storage.
17 18 19 |
# File 'lib/a2a/server/push_notification_manager.rb', line 17 def storage @storage end |
Instance Method Details
#calculate_next_retry_time(attempt) ⇒ Time (private)
Calculate next retry time using exponential backoff
300 301 302 303 304 305 306 307 308 309 310 |
# File 'lib/a2a/server/push_notification_manager.rb', line 300 def calculate_next_retry_time(attempt) base_delay = @config[:retry_base_delay] max_delay = @config[:retry_max_delay] delay = [base_delay * (2**(attempt - 1)), max_delay].min # Add jitter to prevent thundering herd jitter = rand(0.1 * delay) Time.now + delay + jitter end |
#default_config ⇒ Hash (private)
Default configuration
316 317 318 319 320 321 322 323 324 |
# File 'lib/a2a/server/push_notification_manager.rb', line 316 def default_config { webhook_timeout: 30, max_retry_attempts: 3, retry_base_delay: 1.0, # seconds retry_max_delay: 60.0, # seconds log_errors: true } end |
#delete_push_notification_config(*args) ⇒ Boolean
Delete a push notification config
87 88 89 |
# File 'lib/a2a/server/push_notification_manager.rb', line 87 def delete_push_notification_config(*args) @storage.delete_push_notification_config(*args) end |
#get_push_notification_config(task_id, config_id: nil) ⇒ A2A::Types::TaskPushNotificationConfig?
Get push notification config for a task
63 64 65 66 67 68 69 70 |
# File 'lib/a2a/server/push_notification_manager.rb', line 63 def get_push_notification_config(task_id, config_id: nil) if config_id @storage.get_push_notification_config_by_id(task_id, config_id) else configs = @storage.list_push_notification_configs(task_id) configs.first # Return the first config if no specific ID requested end end |
#list_push_notification_configs(*args) ⇒ Array<A2A::Types::TaskPushNotificationConfig>
List all push notification configs for a task
77 78 79 |
# File 'lib/a2a/server/push_notification_manager.rb', line 77 def list_push_notification_configs(*args) @storage.list_push_notification_configs(*args) end |
#notify_task_artifact_update(event) ⇒ void
This method returns an undefined value.
Send a task artifact update notification
112 113 114 115 116 117 118 119 120 121 |
# File 'lib/a2a/server/push_notification_manager.rb', line 112 def notify_task_artifact_update(event) configs = list_push_notification_configs(event.task_id) configs.each do |config| send_webhook_notification(config, "task_artifact_update", event) end # Send to SSE clients send_sse_notification(event.task_id, "task_artifact_update", event) end |
#notify_task_status_update(event) ⇒ void
This method returns an undefined value.
Send a task status update notification
96 97 98 99 100 101 102 103 104 105 |
# File 'lib/a2a/server/push_notification_manager.rb', line 96 def notify_task_status_update(event) configs = list_push_notification_configs(event.task_id) configs.each do |config| send_webhook_notification(config, "task_status_update", event) end # Send to SSE clients send_sse_notification(event.task_id, "task_status_update", event) end |
#process_retry_queue ⇒ void
This method returns an undefined value.
Process retry queue
This method should be called periodically to retry failed notifications
159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
# File 'lib/a2a/server/push_notification_manager.rb', line 159 def process_retry_queue @mutex.synchronize do current_time = Time.now @retry_queue.select! do |retry_item| if current_time >= retry_item[:next_retry_at] # Attempt retry success = send_webhook_notification_internal( retry_item[:config], retry_item[:event_type], retry_item[:event_data], retry_item[:attempt] ) if success false # Remove from queue elsif retry_item[:attempt] < @config[:max_retry_attempts] # Schedule next retry if not exceeded max attempts retry_item[:attempt] += 1 retry_item[:next_retry_at] = calculate_next_retry_time(retry_item[:attempt]) true else false # Remove from queue (max attempts exceeded) end else true # Keep in queue (not time for retry yet) end end end end |
#register_sse_client(task_id, client) ⇒ String
Register an SSE client for task updates
129 130 131 132 133 134 135 136 137 138 |
# File 'lib/a2a/server/push_notification_manager.rb', line 129 def register_sse_client(task_id, client) client_id = SecureRandom.uuid @mutex.synchronize do @sse_clients[task_id] ||= {} @sse_clients[task_id][client_id] = client end client_id end |
#send_sse_notification(task_id, event_type, event_data) ⇒ void (private)
This method returns an undefined value.
Send SSE notification to registered clients
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 |
# File 'lib/a2a/server/push_notification_manager.rb', line 265 def send_sse_notification(task_id, event_type, event_data) @mutex.synchronize do clients = @sse_clients[task_id] return unless clients # Build SSE message sse_data = { event_type: event_type, event_data: event_data.to_h, timestamp: Time.now.utc.iso8601 } = "event: #{event_type}\n" += "data: #{JSON.generate(sse_data)}\n\n" # Send to all clients for this task clients.each do |client_id, client| client.write() client.flush if client.respond_to?(:flush) rescue StandardError => e # Remove disconnected client clients.delete(client_id) warn "SSE client disconnected: #{e.message}" if @config[:log_errors] end # Clean up empty task entries @sse_clients.delete(task_id) if clients.empty? end end |
#send_webhook_notification(config, event_type, event_data) ⇒ void (private)
This method returns an undefined value.
Send a webhook notification
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 |
# File 'lib/a2a/server/push_notification_manager.rb', line 199 def send_webhook_notification(config, event_type, event_data) success = send_webhook_notification_internal(config, event_type, event_data, 1) return if success # Add to retry queue @mutex.synchronize do @retry_queue << { config: config, event_type: event_type, event_data: event_data, attempt: 1, next_retry_at: calculate_next_retry_time(1) } end end |
#send_webhook_notification_internal(config, event_type, event_data, attempt) ⇒ Boolean (private)
Internal webhook notification sending
224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 |
# File 'lib/a2a/server/push_notification_manager.rb', line 224 def send_webhook_notification_internal(config, event_type, event_data, attempt) uri = URI.parse(config.webhook_url) http = Net::HTTP.new(uri.host, uri.port) http.use_ssl = uri.scheme == "https" http.read_timeout = @config[:webhook_timeout] http.open_timeout = @config[:webhook_timeout] request = Net::HTTP::Post.new(uri.path) request["Content-Type"] = "application/json" request["User-Agent"] = "A2A-Ruby/#{A2A::VERSION}" # Add authentication headers config.auth_headers.each { |key, value| request[key] = value } # Build payload payload = { event_type: event_type, event_data: event_data.to_h, timestamp: Time.now.utc.iso8601, attempt: attempt } request.body = JSON.generate(payload) response = http.request(request) # Consider 2xx responses as successful response.code.to_i.between?(200, 299) rescue StandardError => e warn "Webhook notification failed: #{e.message}" if @config[:log_errors] false end |
#set_push_notification_config(task_id, config) ⇒ A2A::Types::TaskPushNotificationConfig
Set a push notification config for a task
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
# File 'lib/a2a/server/push_notification_manager.rb', line 38 def set_push_notification_config(task_id, config) notification_config = if config.is_a?(A2A::Types::PushNotificationConfig) config else A2A::Types::PushNotificationConfig.from_h(config) end # Generate ID if not provided notification_config.instance_variable_set(:@id, SecureRandom.uuid) unless notification_config.id task_config = A2A::Types::TaskPushNotificationConfig.new( task_id: task_id, push_notification_config: notification_config ) @storage.save_push_notification_config(task_config) task_config end |
#unregister_sse_client(task_id, client_id) ⇒ void
This method returns an undefined value.
Unregister an SSE client
146 147 148 149 150 151 |
# File 'lib/a2a/server/push_notification_manager.rb', line 146 def unregister_sse_client(task_id, client_id) @mutex.synchronize do @sse_clients[task_id]&.delete(client_id) @sse_clients.delete(task_id) if @sse_clients[task_id] && @sse_clients[task_id].empty? end end |