Class: Tap::Tasks::Stream

Inherits:
Load
  • Object
show all
Defined in:
lib/tap/tasks/stream.rb,
lib/tap/tasks/stream/yaml.rb

Overview

Stream recurrently loads data from $stdin by requeing self until an end-of-file is reached. This behavior is useful for creating tasks that load a bit of data from an IO, send it into a workflow, and then repeat.

The eof cutoff can be modified using complete? method. Streaming will stop when complete? returns true. For instance, this is a prompt task:

class Prompt < Tap::Tasks::Stream
  config :exit_seq, "\n"

  def load(io)
    if io.eof?
      nil
    else
      io.readline
    end
  end

  def complete?(io, line)
    line == nil || line == exit_seq
  end
end

Direct Known Subclasses

Yaml

Defined Under Namespace

Classes: Yaml

Instance Method Summary collapse

Instance Method Details

#complete?(io, last) ⇒ Boolean

Returns true by default. Override in subclasses to allow recurrent loading (see process).

Returns:

  • (Boolean)


54
55
56
# File 'lib/tap/tasks/stream.rb', line 54

def complete?(io, last)
  io.eof?
end

#process(io = $stdin) ⇒ Object

Loads data from io. Process will open the input io object, load a result, then check to see if the loading is complete (using the complete? method). Unless loading is complete, process will enque io to self. Process will close io when loading is complete, provided use_close or file is specified.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/tap/tasks/stream.rb', line 37

def process(io=$stdin)
  io = open(io)
  result = load(io)
  
  if complete?(io, result)
    if use_close || file
      close(io)
    end
  else
    reque(io)
  end
  
  result
end

#reque(io) ⇒ Object

Reques self with io to the top of the queue.



59
60
61
# File 'lib/tap/tasks/stream.rb', line 59

def reque(io)
  app.queue.unshift(self, [io])
end