Class: ReductoAI::Resources::Pipeline

Inherits:
Object
  • Object
show all
Defined in:
lib/reducto_ai/resources/pipeline.rb

Overview

Note:

Pipeline operations consume credits based on all steps executed.

Pipeline resource for multi-step document processing workflows.

Orchestrates multiple Reducto operations (parse, extract, split, edit) in a single request, with outputs from earlier steps feeding into later ones.

Examples:

Parse then extract

client = ReductoAI::Client.new
result = client.pipeline.sync(
  input: "https://example.com/invoice.pdf",
  steps: [
    { type: "parse", output_formats: { markdown: true } },
    { type: "extract", instructions: { total: "number", date: "string" } }
  ]
)
extracted_data = result["result"]["steps"][1]["result"]

Instance Method Summary collapse

Constructor Details

#initialize(client) ⇒ Pipeline

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of Pipeline.

Parameters:

  • client (Client)

    the Reducto API client



25
26
27
# File 'lib/reducto_ai/resources/pipeline.rb', line 25

def initialize(client)
  @client = client
end

Instance Method Details

#async(input:, steps:, async: nil, **options) ⇒ Hash

Executes a multi-step pipeline asynchronously.

Returns immediately with a job_id. Poll with Jobs#retrieve to get results.

Examples:

job = client.pipeline.async(
  input: "https://example.com/complex-doc.pdf",
  steps: [
    { type: "split" },
    { type: "parse", output_formats: { markdown: true } }
  ]
)
job_id = job["job_id"]

Parameters:

  • input (String, Hash)

    Document URL or hash with :url key

  • steps (Array<Hash>)

    Array of step configurations (same as #sync)

  • async (Boolean, nil) (defaults to: nil)

    Async mode flag

  • options (Hash)

    Additional pipeline options

Returns:

  • (Hash)

    Job status with keys:

    • "job_id" [String] - Job identifier for polling
    • "status" [String] - Initial status ("processing")

Raises:

  • (ArgumentError)

    if input or steps are nil/empty

See Also:



92
93
94
95
96
97
98
99
100
101
# File 'lib/reducto_ai/resources/pipeline.rb', line 92

def async(input:, steps:, async: nil, **options)
  raise ArgumentError, "input is required" if input.nil?
  raise ArgumentError, "steps are required" if steps.nil? || (steps.respond_to?(:empty?) && steps.empty?)

  payload = { input: input, steps: steps }
  payload[:async] = async unless async.nil?
  payload.merge!(options.compact)

  @client.post("/pipeline_async", payload)
end

#sync(input:, steps:, **options) ⇒ Hash

Executes a multi-step pipeline synchronously.

Examples:

Parse and extract in one request

result = client.pipeline.sync(
  input: "https://example.com/form.pdf",
  steps: [
    { type: "parse" },
    { type: "extract", instructions: { name: "string", amount: "number" } }
  ]
)

Parameters:

  • input (String, Hash)

    Document URL or hash with :url key

  • steps (Array<Hash>)

    Array of step configurations. Each step must have a :type key ("parse", "extract", "split", "edit") and type-specific options.

  • options (Hash)

    Additional pipeline options

Returns:

  • (Hash)

    Pipeline results with keys:

    • "job_id" [String] - Job identifier
    • "status" [String] - Job status ("succeeded")
    • "result" [Hash] - Contains "steps" array with each step's result
    • "usage" [Hash] - Credit usage details

Raises:

  • (ArgumentError)

    if input or steps are nil/empty

  • (ClientError)

    if step configuration is invalid

  • (ServerError)

    if pipeline execution fails

See Also:



57
58
59
60
61
62
63
# File 'lib/reducto_ai/resources/pipeline.rb', line 57

def sync(input:, steps:, **options)
  raise ArgumentError, "input is required" if input.nil?
  raise ArgumentError, "steps are required" if steps.nil? || (steps.respond_to?(:empty?) && steps.empty?)

  payload = { input: input, steps: steps, **options }.compact
  @client.post("/pipeline", payload)
end