Class: A2A::Server::PushNotificationManager

Inherits:
Object
  • Object
show all
Defined in:
lib/a2a/server/push_notification_manager.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(storage: nil, config: {}) ⇒ PushNotificationManager

Initialize a new PushNotificationManager

Parameters:

  • storage (Object) (defaults to: nil)

    Storage backend for push notification configs

  • config (Hash) (defaults to: {})

    Configuration options



24
25
26
27
28
29
30
# File 'lib/a2a/server/push_notification_manager.rb', line 24

def initialize(storage: nil, config: {})
  @storage = storage || A2A::Server::Storage::Memory.new
  @config = default_config.merge(config)
  @sse_clients = {}
  @retry_queue = []
  @mutex = Mutex.new
end

Instance Attribute Details

#configObject (readonly)

Returns the value of attribute config.



17
18
19
# File 'lib/a2a/server/push_notification_manager.rb', line 17

def config
  @config
end

#storageObject (readonly)

Returns the value of attribute storage.



17
18
19
# File 'lib/a2a/server/push_notification_manager.rb', line 17

def storage
  @storage
end

Instance Method Details

#calculate_next_retry_time(attempt) ⇒ Time (private)

Calculate next retry time using exponential backoff

Parameters:

  • attempt (Integer)

    The attempt number

Returns:

  • (Time)

    The next retry time



300
301
302
303
304
305
306
307
308
309
310
# File 'lib/a2a/server/push_notification_manager.rb', line 300

def calculate_next_retry_time(attempt)
  base_delay = @config[:retry_base_delay]
  max_delay = @config[:retry_max_delay]

  delay = [base_delay * (2**(attempt - 1)), max_delay].min

  # Add jitter to prevent thundering herd
  jitter = rand(0.1 * delay)

  Time.now + delay + jitter
end

#default_configHash (private)

Default configuration

Returns:

  • (Hash)

    Default configuration



316
317
318
319
320
321
322
323
324
# File 'lib/a2a/server/push_notification_manager.rb', line 316

def default_config
  {
    webhook_timeout: 30,
    max_retry_attempts: 3,
    retry_base_delay: 1.0, # seconds
    retry_max_delay: 60.0, # seconds
    log_errors: true
  }
end

#delete_push_notification_config(*args) ⇒ Boolean

Delete a push notification config

Parameters:

  • task_id (String)

    The task ID

  • config_id (String)

    The config ID

Returns:

  • (Boolean)

    True if deleted, false if not found



87
88
89
# File 'lib/a2a/server/push_notification_manager.rb', line 87

def delete_push_notification_config(*args)
  @storage.delete_push_notification_config(*args)
end

#get_push_notification_config(task_id, config_id: nil) ⇒ A2A::Types::TaskPushNotificationConfig?

Get push notification config for a task

Parameters:

  • task_id (String)

    The task ID

  • config_id (String, nil) (defaults to: nil)

    Optional specific config ID

Returns:



63
64
65
66
67
68
69
70
# File 'lib/a2a/server/push_notification_manager.rb', line 63

def get_push_notification_config(task_id, config_id: nil)
  if config_id
    @storage.get_push_notification_config_by_id(task_id, config_id)
  else
    configs = @storage.list_push_notification_configs(task_id)
    configs.first # Return the first config if no specific ID requested
  end
end

#list_push_notification_configs(*args) ⇒ Array<A2A::Types::TaskPushNotificationConfig>

List all push notification configs for a task

Parameters:

  • task_id (String)

    The task ID

Returns:



77
78
79
# File 'lib/a2a/server/push_notification_manager.rb', line 77

def list_push_notification_configs(*args)
  @storage.list_push_notification_configs(*args)
end

#notify_task_artifact_update(event) ⇒ void

This method returns an undefined value.

Send a task artifact update notification

Parameters:



112
113
114
115
116
117
118
119
120
121
# File 'lib/a2a/server/push_notification_manager.rb', line 112

def notify_task_artifact_update(event)
  configs = list_push_notification_configs(event.task_id)

  configs.each do |config|
    send_webhook_notification(config, "task_artifact_update", event)
  end

  # Send to SSE clients
  send_sse_notification(event.task_id, "task_artifact_update", event)
end

#notify_task_status_update(event) ⇒ void

This method returns an undefined value.

Send a task status update notification

Parameters:



96
97
98
99
100
101
102
103
104
105
# File 'lib/a2a/server/push_notification_manager.rb', line 96

def notify_task_status_update(event)
  configs = list_push_notification_configs(event.task_id)

  configs.each do |config|
    send_webhook_notification(config, "task_status_update", event)
  end

  # Send to SSE clients
  send_sse_notification(event.task_id, "task_status_update", event)
end

#process_retry_queuevoid

This method returns an undefined value.

Process retry queue

This method should be called periodically to retry failed notifications



159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/a2a/server/push_notification_manager.rb', line 159

def process_retry_queue
  @mutex.synchronize do
    current_time = Time.now

    @retry_queue.select! do |retry_item|
      if current_time >= retry_item[:next_retry_at]
        # Attempt retry
        success = send_webhook_notification_internal(
          retry_item[:config],
          retry_item[:event_type],
          retry_item[:event_data],
          retry_item[:attempt]
        )

        if success
          false # Remove from queue
        elsif retry_item[:attempt] < @config[:max_retry_attempts]
          # Schedule next retry if not exceeded max attempts
          retry_item[:attempt] += 1
          retry_item[:next_retry_at] = calculate_next_retry_time(retry_item[:attempt])
          true
        else
          false # Remove from queue (max attempts exceeded)
        end
      else
        true # Keep in queue (not time for retry yet)
      end
    end
  end
end

#register_sse_client(task_id, client) ⇒ String

Register an SSE client for task updates

Parameters:

  • task_id (String)

    The task ID

  • client (Object)

    The SSE client (response object)

Returns:

  • (String)

    Client ID



129
130
131
132
133
134
135
136
137
138
# File 'lib/a2a/server/push_notification_manager.rb', line 129

def register_sse_client(task_id, client)
  client_id = SecureRandom.uuid

  @mutex.synchronize do
    @sse_clients[task_id] ||= {}
    @sse_clients[task_id][client_id] = client
  end

  client_id
end

#send_sse_notification(task_id, event_type, event_data) ⇒ void (private)

This method returns an undefined value.

Send SSE notification to registered clients

Parameters:

  • task_id (String)

    The task ID

  • event_type (String)

    The event type

  • event_data (Object)

    The event data



265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
# File 'lib/a2a/server/push_notification_manager.rb', line 265

def send_sse_notification(task_id, event_type, event_data)
  @mutex.synchronize do
    clients = @sse_clients[task_id]
    return unless clients

    # Build SSE message
    sse_data = {
      event_type: event_type,
      event_data: event_data.to_h,
      timestamp: Time.now.utc.iso8601
    }

    sse_message = "event: #{event_type}\n"
    sse_message += "data: #{JSON.generate(sse_data)}\n\n"

    # Send to all clients for this task
    clients.each do |client_id, client|
      client.write(sse_message)
      client.flush if client.respond_to?(:flush)
    rescue StandardError => e
      # Remove disconnected client
      clients.delete(client_id)
      warn "SSE client disconnected: #{e.message}" if @config[:log_errors]
    end

    # Clean up empty task entries
    @sse_clients.delete(task_id) if clients.empty?
  end
end

#send_webhook_notification(config, event_type, event_data) ⇒ void (private)

This method returns an undefined value.

Send a webhook notification

Parameters:



199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
# File 'lib/a2a/server/push_notification_manager.rb', line 199

def send_webhook_notification(config, event_type, event_data)
  success = send_webhook_notification_internal(config, event_type, event_data, 1)

  return if success

  # Add to retry queue
  @mutex.synchronize do
    @retry_queue << {
      config: config,
      event_type: event_type,
      event_data: event_data,
      attempt: 1,
      next_retry_at: calculate_next_retry_time(1)
    }
  end
end

#send_webhook_notification_internal(config, event_type, event_data, attempt) ⇒ Boolean (private)

Internal webhook notification sending

Parameters:

  • config (A2A::Types::TaskPushNotificationConfig)

    The notification config

  • event_type (String)

    The event type

  • event_data (Object)

    The event data

  • attempt (Integer)

    The attempt number

Returns:

  • (Boolean)

    True if successful



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/a2a/server/push_notification_manager.rb', line 224

def send_webhook_notification_internal(config, event_type, event_data, attempt)
  uri = URI.parse(config.webhook_url)

  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = uri.scheme == "https"
  http.read_timeout = @config[:webhook_timeout]
  http.open_timeout = @config[:webhook_timeout]

  request = Net::HTTP::Post.new(uri.path)
  request["Content-Type"] = "application/json"
  request["User-Agent"] = "A2A-Ruby/#{A2A::VERSION}"

  # Add authentication headers
  config.auth_headers.each { |key, value| request[key] = value }

  # Build payload
  payload = {
    event_type: event_type,
    event_data: event_data.to_h,
    timestamp: Time.now.utc.iso8601,
    attempt: attempt
  }

  request.body = JSON.generate(payload)

  response = http.request(request)

  # Consider 2xx responses as successful
  response.code.to_i.between?(200, 299)
rescue StandardError => e
  warn "Webhook notification failed: #{e.message}" if @config[:log_errors]
  false
end

#set_push_notification_config(task_id, config) ⇒ A2A::Types::TaskPushNotificationConfig

Set a push notification config for a task

Parameters:

Returns:



38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/a2a/server/push_notification_manager.rb', line 38

def set_push_notification_config(task_id, config)
  notification_config = if config.is_a?(A2A::Types::PushNotificationConfig)
                          config
                        else
                          A2A::Types::PushNotificationConfig.from_h(config)
                        end

  # Generate ID if not provided
  notification_config.instance_variable_set(:@id, SecureRandom.uuid) unless notification_config.id

  task_config = A2A::Types::TaskPushNotificationConfig.new(
    task_id: task_id,
    push_notification_config: notification_config
  )

  @storage.save_push_notification_config(task_config)
  task_config
end

#unregister_sse_client(task_id, client_id) ⇒ void

This method returns an undefined value.

Unregister an SSE client

Parameters:

  • task_id (String)

    The task ID

  • client_id (String)

    The client ID



146
147
148
149
150
151
# File 'lib/a2a/server/push_notification_manager.rb', line 146

def unregister_sse_client(task_id, client_id)
  @mutex.synchronize do
    @sse_clients[task_id]&.delete(client_id)
    @sse_clients.delete(task_id) if @sse_clients[task_id] && @sse_clients[task_id].empty?
  end
end