Class: Workato::Connector::Sdk::Stream
Defined Under Namespace
Classes: Proxy
Constant Summary collapse
- DEFAULT_FRAME_SIZE =
T.let(10.megabytes, Integer)
Instance Attribute Summary
Attributes inherited from Operation
Class Method Summary collapse
Instance Method Summary collapse
- #chunk(input = {}, from = 0, to = from + DEFAULT_FRAME_SIZE, frame_size = DEFAULT_FRAME_SIZE) ⇒ SorbetTypes::StreamOutput
- #initialize(stream:, methods: {}, connection: Connection.new) ⇒ void constructor
- #invoke(input = {}, frame_size = DEFAULT_FRAME_SIZE) ⇒ T.untyped
Methods included from Dsl::ReinvokeAfter
Methods inherited from Operation
#execute, #extended_schema, #input_fields, #output_fields, #sample_output, #summarize_input, #summarize_output
Methods included from Dsl::ExecutionContext
#execution_context, #recipe_id!
Methods included from Dsl::Error
Methods included from Dsl::HTTP
#copy, #delete, #get, #head, #move, #options, #parallel, #patch, #post, #put
Methods included from Dsl::AWS
Methods included from Dsl::Global
#blank, #clear, #decrypt, #encrypt, #null, #puts, #skip, #sleep, #workato
Methods included from Dsl::WorkatoSchema
Methods included from Dsl::LookupTable
Methods included from Dsl::AccountProperty
Methods included from Dsl::Time
Constructor Details
#initialize(stream:, methods: {}, connection: Connection.new) ⇒ void
47 48 49 50 |
# File 'lib/workato/connector/sdk/stream.rb', line 47 def initialize(stream:, methods: {}, connection: Connection.new) super(methods: methods, connection: connection) @stream_proc = stream end |
Class Method Details
.each_chunk(stream:, from:, frame_size: nil, &blk) ⇒ void
This method returns an undefined value.
91 92 93 |
# File 'lib/workato/connector/sdk/stream.rb', line 91 def each_chunk(stream:, from:, frame_size: nil, &blk) Reader.new(stream: stream, from: from, frame_size: frame_size).each_chunk(&blk) end |
Instance Method Details
#chunk(input = {}, from = 0, to = from + DEFAULT_FRAME_SIZE, frame_size = DEFAULT_FRAME_SIZE) ⇒ SorbetTypes::StreamOutput
60 61 62 63 64 65 66 67 |
# File 'lib/workato/connector/sdk/stream.rb', line 60 def chunk(input = {}, from = 0, to = from + DEFAULT_FRAME_SIZE, frame_size = DEFAULT_FRAME_SIZE) raise "'frame_size' must be a positive integer number" unless frame_size.positive? stream_proc = @stream_proc execute(nil, { input: input, from: from, to: to, size: frame_size }) do |_, input| # rubocop:disable Lint/ShadowingOuterLocalVariable T.unsafe(self).instance_exec(input['input'], input['from'], input['to'], input['size'], &stream_proc) end end |
#invoke(input = {}, frame_size = DEFAULT_FRAME_SIZE) ⇒ T.untyped
70 71 72 73 74 75 76 77 78 |
# File 'lib/workato/connector/sdk/stream.rb', line 70 def invoke(input = {}, frame_size = DEFAULT_FRAME_SIZE) proxy = Proxy.new(name: '', input: input, stream: self) reader = Reader.new(stream: proxy, frame_size: frame_size) data = T.let(nil, T.untyped) reader.each_chunk do |chunk| data = data.nil? ? chunk : data + chunk end data end |