Class: Google::ADK::ParallelAgent

Inherits:
BaseAgent
  • Object
show all
Defined in:
lib/google/adk/agents/workflow_agents/parallel_agent.rb

Overview

Agent that executes sub-agents in parallel

Constant Summary

Constants inherited from BaseAgent

BaseAgent::AGENT_NAME_REGEX

Instance Attribute Summary collapse

Attributes inherited from BaseAgent

#after_agent_callback, #before_agent_callback, #description, #name, #parent_agent, #sub_agents

Instance Method Summary collapse

Methods inherited from BaseAgent

#clone, #find_agent, #find_sub_agent, from_config, #run_live

Constructor Details

#initialize(name:, agents:, description: nil, aggregation_strategy: :all, before_agent_callback: nil, after_agent_callback: nil) ⇒ ParallelAgent

Initialize a parallel agent

Raises:

  • (ArgumentError)

    If no agents provided



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/google/adk/agents/workflow_agents/parallel_agent.rb', line 20

def initialize(name:, agents:, description: nil, aggregation_strategy: :all,
               before_agent_callback: nil, after_agent_callback: nil)
  raise ArgumentError, "Parallel agent requires at least one agent" if agents.empty?

  super(
    name: name,
    description: description || "Executes #{agents.length} agents in parallel",
    sub_agents: agents,
    before_agent_callback: before_agent_callback,
    after_agent_callback: after_agent_callback
  )

  @agents = agents
  @aggregation_strategy = aggregation_strategy
end

Instance Attribute Details

#agentsObject (readonly)

Returns the value of attribute agents.



9
10
11
# File 'lib/google/adk/agents/workflow_agents/parallel_agent.rb', line 9

def agents
  @agents
end

#aggregation_strategyObject (readonly)

Returns the value of attribute aggregation_strategy.



9
10
11
# File 'lib/google/adk/agents/workflow_agents/parallel_agent.rb', line 9

def aggregation_strategy
  @aggregation_strategy
end

Instance Method Details

#run_async(message, context: nil) {|Event| ... } ⇒ Object

Run agents in parallel

Yields:

  • (Event)

    Events during execution



41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/google/adk/agents/workflow_agents/parallel_agent.rb', line 41

def run_async(message, context: nil)
  Enumerator.new do |yielder|
    invocation_id = context&.invocation_id || "par-#{SecureRandom.uuid}"

    # Yield start event
    start_event = Event.new(
      invocation_id: invocation_id,
      author: @name,
      content: "Starting parallel execution with #{@agents.length} agents"
    )
    yielder << start_event

    # Collect results from all agents
    agent_results = {}
    failed_agents = []

    # In a real async implementation, these would run concurrently
    # For this simplified version, we'll run them sequentially
    # but collect all results before aggregating
    @agents.each do |agent|
      begin
        agent_events = []
        
        if agent.respond_to?(:run_async)
          agent.run_async(message, context: context).each do |event|
            yielder << event
            agent_events << event
          end
        else
          # For agents that don't implement run_async
          error_event = Event.new(
            invocation_id: invocation_id,
            author: @name,
            content: "Agent #{agent.name} does not implement run_async"
          )
          yielder << error_event
        end

        # Store the last content event as the result
        last_content = agent_events.reverse.find { |e| e.content }&.content
        agent_results[agent.name] = last_content if last_content

      rescue StandardError => e
        # Track failed agents
        failed_agents << agent.name
        error_event = Event.new(
          invocation_id: invocation_id,
          author: @name,
          content: "Agent #{agent.name} failed: #{e.message}"
        )
        yielder << error_event
      end
    end

    # Aggregate results based on strategy
    final_result = aggregate_results(agent_results, failed_agents)

    # Yield completion event
    end_event = Event.new(
      invocation_id: invocation_id,
      author: @name,
      content: final_result
    )
    yielder << end_event
  end
end