Class: Cascading::Flow
- Extended by:
- Registerable
- Defined in:
- lib/cascading/flow.rb
Overview
A Flow wraps a c.f.Flow. A Flow is composed of Assemblies, which are constructed using the Flow#assembly method within the block passed to the Cascading::flow or Cascade#flow constructor. Many Assemblies may be nested within a Flow.
Instance Attribute Summary collapse
-
#incoming_scopes ⇒ Object
Returns the value of attribute incoming_scopes.
-
#listeners ⇒ Object
Returns the value of attribute listeners.
-
#mode ⇒ Object
readonly
Returns the value of attribute mode.
-
#outgoing_scopes ⇒ Object
Returns the value of attribute outgoing_scopes.
-
#properties ⇒ Object
readonly
Returns the value of attribute properties.
-
#sinks ⇒ Object
Returns the value of attribute sinks.
-
#sources ⇒ Object
Returns the value of attribute sources.
Attributes inherited from Node
#child_names, #children, #last_child, #name, #parent
Instance Method Summary collapse
-
#add_archive_to_distributed_cache(file) ⇒ Object
Adds the given path to the mapred.cache.archives list property.
-
#add_file_to_distributed_cache(file) ⇒ Object
Adds the given path to the mapred.cache.files list property.
-
#add_listener(listener) ⇒ Object
Appends a FlowListener to the list of listeners for this flow.
-
#assembly(name, &block) ⇒ Object
Builds a child Assembly in this Flow given a name and block.
-
#complete ⇒ Object
Completes this Flow after connecting it.
-
#compress_output(codec, type) ⇒ Object
Property modifier that sets the codec and type of the compression for all sinks in this flow.
-
#connect ⇒ Object
Connects this Flow, producing a c.f.Flow without completing it (the Flow is not executed).
-
#debug_scope(name = nil) ⇒ Object
Prints information about the scope of this Flow at the point at which it is called by default, or for the child specified by the given name, if specified.
-
#describe(offset = '') ⇒ Object
Produces a textual description of this Flow.
-
#emr_local_path_for_distributed_cache_file(file) ⇒ Object
Handles locating a file cached from S3 on local disk.
-
#initialize(name, parent, options = {}) ⇒ Flow
constructor
Do not use this constructor directly.
-
#scope(name = nil) ⇒ Object
Accesses the outgoing scope of this Flow at the point at which it is called by default, or for the child specified by the given name, if specified.
-
#set_spill_threshold(threshold) ⇒ Object
Set the cascading.spill.list.threshold property in this flow’s properties.
-
#sink(name, tap) ⇒ Object
Create a new sink for this flow, using the specified name and Cascading::Tap.
-
#sink_metadata ⇒ Object
Builds a map, keyed by sink name, of the sink metadata for each sink.
-
#source(name, tap) ⇒ Object
Create a new source for this flow, using the specified name and Cascading::Tap.
Methods included from Registerable
Methods inherited from Node
#add_child, #find_child, #qualified_name, #root
Constructor Details
#initialize(name, parent, options = {}) ⇒ Flow
Do not use this constructor directly. Instead, use Cascading::flow to build top-level flows and Cascade#flow to build flows within a Cascade.
Builds a Flow given a name and a parent node (a Cascade or nil).
The named options are:
- properties
-
Properties hash which allows external configuration of this flow. The flow will side-effect the properties during composition, then pass the modified properties along to the FlowConnector for execution. See Cascade#initialize for details on how properties are propagated through cascades.
- mode
-
Mode which will determine the execution mode of this flow. See Mode.parse for details.
27 28 29 30 31 32 33 34 |
# File 'lib/cascading/flow.rb', line 27 def initialize(name, parent, = {}) @sources, @sinks, @incoming_scopes, @outgoing_scopes, @listeners = {}, {}, {}, {}, [] @properties = [:properties] || {} @mode = Mode.parse([:mode]) @flow_scope = Scope.flow_scope(name) super(name, parent) self.class.add(name, self) end |
Instance Attribute Details
#incoming_scopes ⇒ Object
Returns the value of attribute incoming_scopes.
11 12 13 |
# File 'lib/cascading/flow.rb', line 11 def incoming_scopes @incoming_scopes end |
#listeners ⇒ Object
Returns the value of attribute listeners.
11 12 13 |
# File 'lib/cascading/flow.rb', line 11 def listeners @listeners end |
#mode ⇒ Object (readonly)
Returns the value of attribute mode.
12 13 14 |
# File 'lib/cascading/flow.rb', line 12 def mode @mode end |
#outgoing_scopes ⇒ Object
Returns the value of attribute outgoing_scopes.
11 12 13 |
# File 'lib/cascading/flow.rb', line 11 def outgoing_scopes @outgoing_scopes end |
#properties ⇒ Object (readonly)
Returns the value of attribute properties.
12 13 14 |
# File 'lib/cascading/flow.rb', line 12 def properties @properties end |
#sinks ⇒ Object
Returns the value of attribute sinks.
11 12 13 |
# File 'lib/cascading/flow.rb', line 11 def sinks @sinks end |
#sources ⇒ Object
Returns the value of attribute sources.
11 12 13 |
# File 'lib/cascading/flow.rb', line 11 def sources @sources end |
Instance Method Details
#add_archive_to_distributed_cache(file) ⇒ Object
Adds the given path to the mapred.cache.archives list property.
161 162 163 |
# File 'lib/cascading/flow.rb', line 161 def add_archive_to_distributed_cache(file) add_to_distributed_cache(file, "mapred.cache.archives") end |
#add_file_to_distributed_cache(file) ⇒ Object
Adds the given path to the mapred.cache.files list property.
156 157 158 |
# File 'lib/cascading/flow.rb', line 156 def add_file_to_distributed_cache(file) add_to_distributed_cache(file, "mapred.cache.files") end |
#add_listener(listener) ⇒ Object
Appends a FlowListener to the list of listeners for this flow.
166 167 168 |
# File 'lib/cascading/flow.rb', line 166 def add_listener(listener) @listeners << listener end |
#assembly(name, &block) ⇒ Object
Builds a child Assembly in this Flow given a name and block.
An assembly’s name is quite important as it will determine:
-
The sources from which it will read, if any
-
The name to be used in joins or unions downstream
-
The name to be used to sink the output of the assembly downstream
Many assemblies may be built within a flow. The Assembly#branch method is used for creating nested assemblies and produces objects of the same type as this constructor.
Example:
flow 'wordcount', :mode => :local do
assembly 'first_step' do
...
end
assembly 'second_step' do
...
end
end
57 58 59 60 61 62 63 |
# File 'lib/cascading/flow.rb', line 57 def assembly(name, &block) raise "Could not build assembly '#{name}'; block required" unless block_given? assembly = Assembly.new(name, self, @outgoing_scopes) add_child(assembly) assembly.instance_eval(&block) assembly end |
#complete ⇒ Object
Completes this Flow after connecting it. This results in execution of the c.f.Flow built from this Flow. Use this method when executing a top-level Flow.
205 206 207 208 209 210 211 212 213 |
# File 'lib/cascading/flow.rb', line 205 def complete begin flow = connect @listeners.each { |l| flow.addListener(l) } flow.complete rescue NativeException => e raise CascadingException.new(e, 'Error completing flow') end end |
#compress_output(codec, type) ⇒ Object
Property modifier that sets the codec and type of the compression for all sinks in this flow. Currently only supports o.a.h.i.c.DefaultCodec and o.a.h.i.c.GzipCodec, and the the NONE, RECORD, or BLOCK compressions types defined in o.a.h.i.SequenceFile.
codec may be symbols like :default or :gzip and type may be symbols like :none, :record, or :block.
Example:
compress_output :default, :block
134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/cascading/flow.rb', line 134 def compress_output(codec, type) properties['mapred.output.compress'] = 'true' properties['mapred.output.compression.codec'] = case codec when :default then Java::OrgApacheHadoopIoCompress::DefaultCodec.java_class.name when :gzip then Java::OrgApacheHadoopIoCompress::GzipCodec.java_class.name else raise "Codec #{codec} not yet supported by cascading.jruby" end properties['mapred.output.compression.type'] = case type when :none then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::NONE.to_s when :record then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::RECORD.to_s when :block then Java::OrgApacheHadoopIo::SequenceFile::CompressionType::BLOCK.to_s else raise "Compression type '#{type}' not supported" end end |
#connect ⇒ Object
Connects this Flow, producing a c.f.Flow without completing it (the Flow is not executed). This method is used by Cascade to connect its child Flows. To connect and complete a Flow, see Flow#complete.
185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'lib/cascading/flow.rb', line 185 def connect puts "Connecting flow '#{name}' with properties:" properties.keys.sort.each do |key| puts "#{key}=#{properties[key]}" end # FIXME: why do I have to do this in 2.0 wip-255? Java::CascadingProperty::AppProps.setApplicationName(properties, name) Java::CascadingProperty::AppProps.setApplicationVersion(properties, '0.0.0') Java::CascadingProperty::AppProps.setApplicationJarClass(properties, Java::CascadingFlow::Flow.java_class) sources = make_tap_parameter(@sources, :head_pipe) sinks = make_tap_parameter(@sinks, :tail_pipe) pipes = make_pipes mode.connect_flow(properties, name, sources, sinks, pipes) end |
#debug_scope(name = nil) ⇒ Object
Prints information about the scope of this Flow at the point at which it is called by default, or for the child specified by the given name, if specified. This allows you to trace the propagation of field names through your job and is handy for debugging. See Scope for details.
106 107 108 109 110 |
# File 'lib/cascading/flow.rb', line 106 def debug_scope(name = nil) scope = scope(name) name ||= last_child.name puts "Scope for '#{name}':\n #{scope}" end |
#describe(offset = '') ⇒ Object
Produces a textual description of this Flow. The description details the structure of the Flow, its sources and sinks, and the input and output fields of each Assembly. The offset parameter allows for this describe to be nested within a calling context, which lets us indent the structural hierarchy of a job.
84 85 86 87 88 89 90 |
# File 'lib/cascading/flow.rb', line 84 def describe(offset = '') description = "#{offset}#{name}:flow\n" description += "#{sources.keys.map{ |source| "#{offset} #{source}:source :: #{incoming_scopes[source].values_fields.to_a.inspect}" }.join("\n")}\n" description += "#{child_names.map{ |child| children[child].describe("#{offset} ") }.join("\n")}\n" description += "#{sinks.keys.map{ |sink| "#{offset} #{sink}:sink :: #{outgoing_scopes[sink].values_fields.to_a.inspect}" }.join("\n")}" description end |
#emr_local_path_for_distributed_cache_file(file) ⇒ Object
Handles locating a file cached from S3 on local disk. TODO: remove
171 172 173 174 175 176 177 178 179 180 |
# File 'lib/cascading/flow.rb', line 171 def emr_local_path_for_distributed_cache_file(file) # NOTE this needs to be *appended* to the property mapred.local.dir if file =~ /^s3n?:\/\// # EMR "/taskTracker/archive/#{file.gsub(/^s3n?:\/\//, "")}" else # Local file end end |
#scope(name = nil) ⇒ Object
Accesses the outgoing scope of this Flow at the point at which it is called by default, or for the child specified by the given name, if specified. This is useful for grabbing the values_fields at any point in the construction of the Flow. See Scope for details.
96 97 98 99 100 |
# File 'lib/cascading/flow.rb', line 96 def scope(name = nil) raise 'Must specify name if no children have been defined yet' unless name || last_child name ||= last_child.name @outgoing_scopes[name] end |
#set_spill_threshold(threshold) ⇒ Object
Set the cascading.spill.list.threshold property in this flow’s properties. See c.t.c.SpillableProps for details.
151 152 153 |
# File 'lib/cascading/flow.rb', line 151 def set_spill_threshold(threshold) properties['cascading.spill.list.threshold'] = threshold.to_s end |
#sink(name, tap) ⇒ Object
Create a new sink for this flow, using the specified name and Cascading::Tap
75 76 77 |
# File 'lib/cascading/flow.rb', line 75 def sink(name, tap) sinks[name] = tap end |
#sink_metadata ⇒ Object
Builds a map, keyed by sink name, of the sink metadata for each sink. Currently, this contains only the field names of each sink.
114 115 116 117 118 119 120 121 122 |
# File 'lib/cascading/flow.rb', line 114 def @sinks.keys.inject({}) do |, sink_name| raise "Cannot sink undefined assembly '#{sink_name}'" unless @outgoing_scopes[sink_name] [sink_name] = { :field_names => @outgoing_scopes[sink_name].values_fields.to_a, } end end |
#source(name, tap) ⇒ Object
Create a new source for this flow, using the specified name and Cascading::Tap
67 68 69 70 71 |
# File 'lib/cascading/flow.rb', line 67 def source(name, tap) sources[name] = tap incoming_scopes[name] = Scope.source_scope(name, mode.source_tap(name, tap), @flow_scope) outgoing_scopes[name] = incoming_scopes[name] end |