Class: Cascading::Flow

Inherits:
Node
  • Object
show all
Extended by:
Registerable
Defined in:
lib/cascading/flow.rb

Overview

A Flow wraps a c.f.Flow. A Flow is composed of Assemblies, which are constructed using the Flow#assembly method within the block passed to the Cascading::flow or Cascade#flow constructor. Many Assemblies may be nested within a Flow.

Instance Attribute Summary collapse

Attributes inherited from Node

#child_names, #children, #last_child, #name, #parent

Instance Method Summary collapse

Methods included from Registerable

add, all, get, reset

Methods inherited from Node

#add_child, #find_child, #qualified_name, #root

Constructor Details

#initialize(name, parent, options = {}) ⇒ Flow

Do not use this constructor directly. Instead, use Cascading::flow to build top-level flows and Cascade#flow to build flows within a Cascade.

Builds a Flow given a name and a parent node (a Cascade or nil).

The named options are:

properties

Properties hash which allows external configuration of this flow. The flow will side-effect the properties during composition, then pass the modified properties along to the FlowConnector for execution. See Cascade#initialize for details on how properties are propagated through cascades.

mode

Mode which will determine the execution mode of this flow. See Mode.parse for details.



27
28
29
30
31
32
33
34
# File 'lib/cascading/flow.rb', line 27

def initialize(name, parent, options = {})
  @sources, @sinks, @incoming_scopes, @outgoing_scopes, @listeners = {}, {}, {}, {}, []
  @properties = options[:properties] || {}
  @mode = Mode.parse(options[:mode])
  @flow_scope = Scope.flow_scope(name)
  super(name, parent)
  self.class.add(name, self)
end

Instance Attribute Details

#incoming_scopesObject

Returns the value of attribute incoming_scopes.



11
12
13
# File 'lib/cascading/flow.rb', line 11

def incoming_scopes
  @incoming_scopes
end

#listenersObject

Returns the value of attribute listeners.



11
12
13
# File 'lib/cascading/flow.rb', line 11

def listeners
  @listeners
end

#modeObject (readonly)

Returns the value of attribute mode.



12
13
14
# File 'lib/cascading/flow.rb', line 12

def mode
  @mode
end

#outgoing_scopesObject

Returns the value of attribute outgoing_scopes.



11
12
13
# File 'lib/cascading/flow.rb', line 11

def outgoing_scopes
  @outgoing_scopes
end

#propertiesObject (readonly)

Returns the value of attribute properties.



12
13
14
# File 'lib/cascading/flow.rb', line 12

def properties
  @properties
end

#sinksObject

Returns the value of attribute sinks.



11
12
13
# File 'lib/cascading/flow.rb', line 11

def sinks
  @sinks
end

#sourcesObject

Returns the value of attribute sources.



11
12
13
# File 'lib/cascading/flow.rb', line 11

def sources
  @sources
end

Instance Method Details

#add_archive_to_distributed_cache(file) ⇒ Object

Adds the given path to the mapred.cache.archives list property.



161
162
163
# File 'lib/cascading/flow.rb', line 161

def add_archive_to_distributed_cache(file)
  add_to_distributed_cache(file, "mapred.cache.archives")
end

#add_file_to_distributed_cache(file) ⇒ Object

Adds the given path to the mapred.cache.files list property.



156
157
158
# File 'lib/cascading/flow.rb', line 156

def add_file_to_distributed_cache(file)
  add_to_distributed_cache(file, "mapred.cache.files")
end

#add_listener(listener) ⇒ Object

Appends a FlowListener to the list of listeners for this flow.



166
167
168
# File 'lib/cascading/flow.rb', line 166

def add_listener(listener)
  @listeners << listener
end

#assembly(name, &block) ⇒ Object

Builds a child Assembly in this Flow given a name and block.

An assembly’s name is quite important as it will determine:

  • The sources from which it will read, if any

  • The name to be used in joins or unions downstream

  • The name to be used to sink the output of the assembly downstream

Many assemblies may be built within a flow. The Assembly#branch method is used for creating nested assemblies and produces objects of the same type as this constructor.

Example:

flow 'wordcount', :mode => :local do
  assembly 'first_step' do
    ...
  end

  assembly 'second_step' do
    ...
  end
end


57
58
59
60
61
62
63
# File 'lib/cascading/flow.rb', line 57

def assembly(name, &block)
  raise "Could not build assembly '#{name}'; block required" unless block_given?
  assembly = Assembly.new(name, self, @outgoing_scopes)
  add_child(assembly)
  assembly.instance_eval(&block)
  assembly
end

#completeObject

Completes this Flow after connecting it. This results in execution of the c.f.Flow built from this Flow. Use this method when executing a top-level Flow.



205
206
207
208
209
210
211
212
213
# File 'lib/cascading/flow.rb', line 205

def complete
  begin
    flow = connect
    @listeners.each { |l| flow.addListener(l) }
    flow.complete
  rescue NativeException => e
    raise CascadingException.new(e, 'Error completing flow')
  end
end

#compress_output(codec, type) ⇒ Object

Property modifier that sets the codec and type of the compression for all sinks in this flow. Currently only supports o.a.h.i.c.DefaultCodec and o.a.h.i.c.GzipCodec, and the the NONE, RECORD, or BLOCK compressions types defined in o.a.h.i.SequenceFile.

codec may be symbols like :default or :gzip and type may be symbols like :none, :record, or :block.

Example:

compress_output :default, :block


134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/cascading/flow.rb', line 134

def compress_output(codec, type)
  properties['mapred.output.compress'] = 'true'
  properties['mapred.output.compression.codec'] = case codec
    when :default then Java::OrgApacheHadoopIoCompress::DefaultCodec.java_class.name
    when :gzip then Java::OrgApacheHadoopIoCompress::GzipCodec.java_class.name
    else raise "Codec #{codec} not yet supported by cascading.jruby"
    end
  properties['mapred.output.compression.type'] = case type
    when :none   then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::NONE.to_s
    when :record then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::RECORD.to_s
    when :block  then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::BLOCK.to_s
    else raise "Compression type '#{type}' not supported"
    end
end

#connectObject

Connects this Flow, producing a c.f.Flow without completing it (the Flow is not executed). This method is used by Cascade to connect its child Flows. To connect and complete a Flow, see Flow#complete.



185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/cascading/flow.rb', line 185

def connect
  puts "Connecting flow '#{name}' with properties:"
  properties.keys.sort.each do |key|
    puts "#{key}=#{properties[key]}"
  end

  # FIXME: why do I have to do this in 2.0 wip-255?
  Java::CascadingProperty::AppProps.setApplicationName(properties, name)
  Java::CascadingProperty::AppProps.setApplicationVersion(properties, '0.0.0')
  Java::CascadingProperty::AppProps.setApplicationJarClass(properties, Java::CascadingFlow::Flow.java_class)

  sources = make_tap_parameter(@sources, :head_pipe)
  sinks = make_tap_parameter(@sinks, :tail_pipe)
  pipes = make_pipes
  mode.connect_flow(properties, name, sources, sinks, pipes)
end

#debug_scope(name = nil) ⇒ Object

Prints information about the scope of this Flow at the point at which it is called by default, or for the child specified by the given name, if specified. This allows you to trace the propagation of field names through your job and is handy for debugging. See Scope for details.



106
107
108
109
110
# File 'lib/cascading/flow.rb', line 106

def debug_scope(name = nil)
  scope = scope(name)
  name ||= last_child.name
  puts "Scope for '#{name}':\n  #{scope}"
end

#describe(offset = '') ⇒ Object

Produces a textual description of this Flow. The description details the structure of the Flow, its sources and sinks, and the input and output fields of each Assembly. The offset parameter allows for this describe to be nested within a calling context, which lets us indent the structural hierarchy of a job.



84
85
86
87
88
89
90
# File 'lib/cascading/flow.rb', line 84

def describe(offset = '')
  description =  "#{offset}#{name}:flow\n"
  description += "#{sources.keys.map{ |source| "#{offset}  #{source}:source :: #{incoming_scopes[source].values_fields.to_a.inspect}" }.join("\n")}\n"
  description += "#{child_names.map{ |child| children[child].describe("#{offset}  ") }.join("\n")}\n"
  description += "#{sinks.keys.map{ |sink| "#{offset}  #{sink}:sink :: #{outgoing_scopes[sink].values_fields.to_a.inspect}" }.join("\n")}"
  description
end

#emr_local_path_for_distributed_cache_file(file) ⇒ Object

Handles locating a file cached from S3 on local disk. TODO: remove



171
172
173
174
175
176
177
178
179
180
# File 'lib/cascading/flow.rb', line 171

def emr_local_path_for_distributed_cache_file(file)
  # NOTE this needs to be *appended* to the property mapred.local.dir
  if file =~ /^s3n?:\/\//
    # EMR
    "/taskTracker/archive/#{file.gsub(/^s3n?:\/\//, "")}"
  else
    # Local
    file
  end
end

#scope(name = nil) ⇒ Object

Accesses the outgoing scope of this Flow at the point at which it is called by default, or for the child specified by the given name, if specified. This is useful for grabbing the values_fields at any point in the construction of the Flow. See Scope for details.



96
97
98
99
100
# File 'lib/cascading/flow.rb', line 96

def scope(name = nil)
  raise 'Must specify name if no children have been defined yet' unless name || last_child
  name ||= last_child.name
  @outgoing_scopes[name]
end

#set_spill_threshold(threshold) ⇒ Object

Set the cascading.spill.list.threshold property in this flow’s properties. See c.t.c.SpillableProps for details.



151
152
153
# File 'lib/cascading/flow.rb', line 151

def set_spill_threshold(threshold)
  properties['cascading.spill.list.threshold'] = threshold.to_s
end

#sink(name, tap) ⇒ Object

Create a new sink for this flow, using the specified name and Cascading::Tap



75
76
77
# File 'lib/cascading/flow.rb', line 75

def sink(name, tap)
  sinks[name] = tap
end

#sink_metadataObject

Builds a map, keyed by sink name, of the sink metadata for each sink. Currently, this contains only the field names of each sink.



114
115
116
117
118
119
120
121
122
# File 'lib/cascading/flow.rb', line 114

def 
  @sinks.keys.inject({}) do |, sink_name|
    raise "Cannot sink undefined assembly '#{sink_name}'" unless @outgoing_scopes[sink_name]
    [sink_name] = {
      :field_names => @outgoing_scopes[sink_name].values_fields.to_a,
    }
    
  end
end

#source(name, tap) ⇒ Object

Create a new source for this flow, using the specified name and Cascading::Tap



67
68
69
70
71
# File 'lib/cascading/flow.rb', line 67

def source(name, tap)
  sources[name] = tap
  incoming_scopes[name] = Scope.source_scope(name, mode.source_tap(name, tap), @flow_scope)
  outgoing_scopes[name] = incoming_scopes[name]
end