Class: Workato::Connector::Sdk::Stream

Inherits:
Operation
  • Object
show all
Extended by:
T::Sig
Includes:
Dsl::ReinvokeAfter
Defined in:
lib/workato/connector/sdk/stream.rb

Defined Under Namespace

Classes: Proxy

Constant Summary collapse

DEFAULT_FRAME_SIZE =
T.let(10.megabytes, Integer)

Instance Attribute Summary

Attributes inherited from Operation

#streams

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Dsl::ReinvokeAfter

#checkpoint!, #reinvoke_after

Methods inherited from Operation

#execute, #extended_schema, #input_fields, #output_fields, #sample_output, #summarize_input, #summarize_output

Methods included from Dsl::ExecutionContext

#execution_context, #recipe_id!

Methods included from Dsl::Error

#error

Methods included from Dsl::HTTP

#copy, #delete, #get, #head, #move, #options, #parallel, #patch, #post, #put

Methods included from Dsl::AWS

#aws

Methods included from Dsl::Global

#blank, #clear, #decrypt, #encrypt, #null, #puts, #skip, #sleep, #workato

Methods included from Dsl::WorkatoSchema

#workato_schema

Methods included from Dsl::LookupTable

#lookup

Methods included from Dsl::AccountProperty

#account_property

Methods included from Dsl::Time

#now, #today

Constructor Details

#initialize(stream:, methods: {}, connection: Connection.new) ⇒ void

Parameters:



47
48
49
50
# File 'lib/workato/connector/sdk/stream.rb', line 47

def initialize(stream:, methods: {}, connection: Connection.new)
  super(methods: methods, connection: connection)
  @stream_proc = stream
end

Class Method Details

.each_chunk(stream:, from:, frame_size: nil, &blk) ⇒ void

This method returns an undefined value.

Parameters:

  • stream (Proxy, Hash{T.untyped => T.untyped}, String)
  • from (Integer, nil)
  • frame_size (Integer, nil) (defaults to: nil)
  • blk (SorbetTypes::StreamInProc)


91
92
93
# File 'lib/workato/connector/sdk/stream.rb', line 91

def each_chunk(stream:, from:, frame_size: nil, &blk)
  Reader.new(stream: stream, from: from, frame_size: frame_size).each_chunk(&blk)
end

Instance Method Details

#chunk(input = {}, from = 0, to = from + DEFAULT_FRAME_SIZE, frame_size = DEFAULT_FRAME_SIZE) ⇒ SorbetTypes::StreamOutput

Parameters:

  • input (SorbetTypes::StreamInputHash) (defaults to: {})
  • from (Integer) (defaults to: 0)
  • to (Integer) (defaults to: from + DEFAULT_FRAME_SIZE)
  • frame_size (Integer) (defaults to: DEFAULT_FRAME_SIZE)

Returns:



60
61
62
63
64
65
66
67
# File 'lib/workato/connector/sdk/stream.rb', line 60

def chunk(input = {}, from = 0, to = from + DEFAULT_FRAME_SIZE, frame_size = DEFAULT_FRAME_SIZE)
  raise "'frame_size' must be a positive integer number" unless frame_size.positive?

  stream_proc = @stream_proc
  execute(nil, { input: input, from: from, to: to, size: frame_size }) do |_, input| # rubocop:disable Lint/ShadowingOuterLocalVariable
    T.unsafe(self).instance_exec(input['input'], input['from'], input['to'], input['size'], &stream_proc)
  end
end

#invoke(input = {}, frame_size = DEFAULT_FRAME_SIZE) ⇒ T.untyped

Parameters:

Returns:

  • (T.untyped)


70
71
72
73
74
75
76
77
78
# File 'lib/workato/connector/sdk/stream.rb', line 70

def invoke(input = {}, frame_size = DEFAULT_FRAME_SIZE)
  proxy = Proxy.new(name: '', input: input, stream: self)
  reader = Reader.new(stream: proxy, frame_size: frame_size)
  data = T.let(nil, T.untyped)
  reader.each_chunk do |chunk|
    data = data.nil? ? chunk : data + chunk
  end
  data
end