Class: Tap::Tasks::Stream

Inherits:
Load show all
Defined in:
lib/tap/tasks/stream.rb

Overview

Stream recurrently loads data from $stdin by requeing self until an end-of-file is reached. This behavior is useful for creating tasks that load a bit of data from an IO, send it into a workflow, and then repeat.

The eof cutoff can be modified using complete? method. Streaming will stop when complete? returns true. For instance, this is a prompt task:

class Prompt < Tap::Tasks::Stream
  config :exit_seq, "\n"

  def load(io)
    if io.eof?
      nil
    else
      io.readline
    end
  end

  def complete?(io, line)
    line == nil || line == exit_seq
  end
end

Direct Known Subclasses

Prompt

Instance Attribute Summary

Attributes inherited from Tap::Task

#joins

Attributes inherited from App::Api

#app

Instance Method Summary collapse

Methods inherited from Load

#close, #load, #open, #open_file

Methods inherited from Tap::Task

#associations, #call, #enq, #exe, #initialize, #log, #on_complete, parser

Methods inherited from App::Api

#associations, build, help, inherited, #initialize, #inspect, parse, parse!, parser, #to_spec

Methods included from Signals

#sig, #signal, #signal?, #signals

Methods included from Signals::ModuleMethods

included

Constructor Details

This class inherits a constructor from Tap::Task

Instance Method Details

#complete?(io, last) ⇒ Boolean

Returns true by default. Override in subclasses to allow recurrent loading (see process).

Returns:

  • (Boolean)


54
55
56
# File 'lib/tap/tasks/stream.rb', line 54

def complete?(io, last)
  io.eof?
end

#process(io = $stdin) ⇒ Object

Loads data from io. Process will open the input io object, load a result, then check to see if the loading is complete (using the complete? method). Unless loading is complete, process will enque io to self. Process will close io when loading is complete, provided use_close or file is specified.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/tap/tasks/stream.rb', line 37

def process(io=$stdin)
  io = open(io)
  result = load(io)
  
  if complete?(io, result)
    if use_close || file
      close(io)
    end
  else
    reque(io)
  end
  
  result
end

#reque(io) ⇒ Object

Reques self with io to the top of the queue.



59
60
61
# File 'lib/tap/tasks/stream.rb', line 59

def reque(io)
  app.pq(self, [io])
end