Module: Cascading
- Defined in:
- lib/cascading/cascading.rb,
lib/cascading.rb,
lib/cascading/tap.rb,
lib/cascading/base.rb,
lib/cascading/flow.rb,
lib/cascading/mode.rb,
lib/cascading/scope.rb,
lib/cascading/cascade.rb,
lib/cascading/assembly.rb,
lib/cascading/expr_stub.rb,
lib/cascading/operations.rb,
lib/cascading/aggregations.rb,
lib/cascading/sub_assembly.rb,
lib/cascading/text_operations.rb,
lib/cascading/regex_operations.rb,
lib/cascading/filter_operations.rb,
lib/cascading/cascading_exception.rb,
lib/cascading/identity_operations.rb
Overview
The Cascading module contains all of the cascading.jruby DSL. Inserting the following into your script:
require 'rubygems'
require 'cascading'
includes this module at the top level, making all of its features available.
To build a dataflow like the one in the README.md or samples, start by looking at Cascade or Flow. These are the highest level structures you’ll use to put together your job.
Within a flow, you’ll connect sources to sinks by way of Assembly, which refers to “pipe assemblies” from Cascading. Within an Assembly, you’ll use functions and filters (see Operations, IdentityOperations, RegexOperations, FilterOperations, and TextOperations) as well as Assembly#group_by, Assembly#union, and Assembly#join. You can provide those last pipes with a block that can select operations from Aggregations.
Finally, you’ll want to address the execution of your job, whether it be locally testing or running remotely on a Hadoop cluster. See the Mode class for the available modes, and parameterize your script such that it can operate in Cascading local mode locally and in Hadoop mode when run in a jar produced with Jading.
Defined Under Namespace
Modules: FilterOperations, IdentityOperations, Operations, RegexOperations, Registerable, TextOperations Classes: Aggregations, AmbiguousNodeNameException, Assembly, BaseTap, Cascade, CascadingException, ExprArgException, ExprStub, Flow, Mode, MultiTap, Node, Scope, SubAssembly, Tap
Constant Summary collapse
- VERSION =
:stopdoc:
'1.0.0'
- JAVA_TYPE_MAP =
Mapping that defines a convenient syntax for specifying Java classes, used in Janino expressions and elsewhere.
{ :int => java.lang.Integer.java_class, :long => java.lang.Long.java_class, :bool => java.lang.Boolean.java_class, :double => java.lang.Double.java_class, :float => java.lang.Float.java_class, :string => java.lang.String.java_class, }
Instance Method Summary collapse
-
#all_fields ⇒ Object
Convenience method wrapping c.t.Fields::ALL.
-
#cascade(name, options = {}, &block) ⇒ Object
Builds a top-level Cascade given a name and a block.
-
#dedup_field_names(*names) ⇒ Object
Helper used by dedup_fields that operates on arrays of field names rather than fields objects.
-
#dedup_fields(*fields) ⇒ Object
Combines fields deduplicating them with trailing underscores as necessary.
-
#describe ⇒ Object
(also: #desc)
Produces a textual description of all Cascades in the global registry.
-
#difference_fields(base_fields, remove_fields) ⇒ Object
Computes fields formed by removing remove_fields from base_fields.
-
#expr(expression, options = {}) ⇒ Object
See ExprStub.expr.
-
#fields(fields) ⇒ Object
Utility method for creating Cascading c.t.Fields from a field name (string) or list of field names (array of strings).
-
#flow(name, options = {}, &block) ⇒ Object
Builds a top-level Flow given a name and block for applications built of flows with no cascades.
-
#last_grouping_fields ⇒ Object
Convenience method wrapping c.t.Fields::VALUES.
-
#local_properties(base_dir) ⇒ Object
Constructs properties to be passed to Flow#complete or Cascade#complete which will locate temporary Hadoop files in base_dir.
-
#multi_sink_tap(*taps) ⇒ Object
Convenience access to MultiTap.multi_sink_tap.
-
#multi_source_tap(*taps) ⇒ Object
Convenience access to MultiTap.multi_source_tap.
-
#sequence_file_scheme(*fields) ⇒ Object
Creates a c.s.h.SequenceFile scheme instance from the specified fields.
-
#tap(path, options = {}) ⇒ Object
Convenience constructor for a Tap, that accepts the same options as that class’ constructor.
-
#text_line_scheme(*args_with_options) ⇒ Object
Creates a TextLine scheme (can be used in both Cascading local and hadoop modes).
Instance Method Details
#all_fields ⇒ Object
Convenience method wrapping c.t.Fields::ALL
147 148 149 |
# File 'lib/cascading/cascading.rb', line 147 def all_fields Java::CascadingTuple::Fields::ALL end |
#cascade(name, options = {}, &block) ⇒ Object
Builds a top-level Cascade given a name and a block.
The named options are:
- properties
-
See Cascade#initialize
- mode
-
See Cascade#initialize
Example:
cascade 'wordcount', :mode => :local do
flow 'first_step' do
...
end
flow 'second_step' do
...
end
end
70 71 72 73 74 75 76 77 78 79 |
# File 'lib/cascading/cascading.rb', line 70 def cascade(name, = {}, &block) raise "Could not build cascade '#{name}'; block required" unless block_given? raise 'Cascading::cascade does not accept the :properties param only the global $jobconf_properties' if [:properties] [:properties] = $jobconf_properties.dup if defined?($jobconf_properties) && $jobconf_properties cascade = Cascade.new(name, ) cascade.instance_eval(&block) cascade end |
#dedup_field_names(*names) ⇒ Object
Helper used by dedup_fields that operates on arrays of field names rather than fields objects.
Example:
left_names = ['a', 'b']
mid_names = ['a', 'c']
right_names = ['a', 'd']
deduped_names = dedup_field_names(left_names, mid_names, right_names)
# deduped_names == ['a', 'b', 'a_', 'c', 'a__', 'd']
185 186 187 188 189 |
# File 'lib/cascading/cascading.rb', line 185 def dedup_field_names(*names) names.inject([]) do |acc, arr| acc + arr.map{ |e| search_field_name(acc, e) } end end |
#dedup_fields(*fields) ⇒ Object
Combines fields deduplicating them with trailing underscores as necessary. This is used in joins to avoid requiring the caller to unique fields before they are joined.
171 172 173 174 |
# File 'lib/cascading/cascading.rb', line 171 def dedup_fields(*fields) raise 'Can only be applied to declarators' unless fields.all?{ |f| f.is_declarator? } fields(dedup_field_names(*fields.map{ |f| f.to_a })) end |
#describe ⇒ Object Also known as: desc
Produces a textual description of all Cascades in the global registry. The description details the structure of the Cascades, the sources and sinks of each Flow, and the input and output fields of each Assembly.
NOTE: will be moved to Job in later version
114 115 116 |
# File 'lib/cascading/cascading.rb', line 114 def describe Cascade.all.map{ |cascade| cascade.describe }.join("\n") end |
#difference_fields(base_fields, remove_fields) ⇒ Object
164 165 166 |
# File 'lib/cascading/cascading.rb', line 164 def difference_fields(base_fields, remove_fields) fields(base_fields.to_a - remove_fields.to_a) end |
#expr(expression, options = {}) ⇒ Object
See ExprStub.expr
120 121 122 |
# File 'lib/cascading/cascading.rb', line 120 def expr(expression, = {}) ExprStub.expr(expression, ) end |
#fields(fields) ⇒ Object
Utility method for creating Cascading c.t.Fields from a field name (string) or list of field names (array of strings). If the input fields is already a c.t.Fields or nil, it is passed through. This allows for flexible use of the method at multiple layers in the DSL.
Example:
cascading_fields = fields(['first', 'second', 'third'])
# cascading_fields.to_a == ['first', 'second', 'third']
132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/cascading/cascading.rb', line 132 def fields(fields) if fields.nil? return nil elsif fields.is_a? Java::CascadingTuple::Fields return fields elsif fields.is_a? ::Array if fields.size == 1 return fields(fields[0]) end raise "Fields cannot be nil: #{fields.inspect}" if fields.include?(nil) end return Java::CascadingTuple::Fields.new([fields].flatten.map{ |f| f.kind_of?(Fixnum) ? java.lang.Integer.new(f) : f }.to_java(java.lang.Comparable)) end |
#flow(name, options = {}, &block) ⇒ Object
Builds a top-level Flow given a name and block for applications built of flows with no cascades.
The named options are:
- properties
-
See Flow#initialize
- mode
-
See Flow#initialize
Example:
flow 'wordcount', :mode => :local do
assembly 'first_step' do
...
end
assembly 'second_step' do
...
end
end
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/cascading/cascading.rb', line 98 def flow(name, = {}, &block) raise "Could not build flow '#{name}'; block required" unless block_given? raise 'Cascading::flow does not accept the :properties param only the global $jobconf_properties' if [:properties] [:properties] = $jobconf_properties.dup if defined?($jobconf_properties) && $jobconf_properties flow = Flow.new(name, nil, ) flow.instance_eval(&block) flow end |
#last_grouping_fields ⇒ Object
Convenience method wrapping c.t.Fields::VALUES
152 153 154 |
# File 'lib/cascading/cascading.rb', line 152 def last_grouping_fields Java::CascadingTuple::Fields::VALUES end |
#local_properties(base_dir) ⇒ Object
Constructs properties to be passed to Flow#complete or Cascade#complete which will locate temporary Hadoop files in base_dir. It is necessary to pass these properties only when executing scripts in Hadoop local mode via JRuby’s main method, which confuses Cascading’s attempt to find the containing jar. When using Cascading local mode, these are unnecessary.
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 |
# File 'lib/cascading/cascading.rb', line 264 def local_properties(base_dir) dirs = { 'test.build.data' => "#{base_dir}/build", 'hadoop.tmp.dir' => "#{base_dir}/tmp", 'hadoop.log.dir' => "#{base_dir}/log", } dirs.each{ |key, dir| `mkdir -p #{dir}` } job_conf = Java::OrgApacheHadoopMapred::JobConf.new job_conf.jar = dirs['test.build.data'] dirs.each{ |key, dir| job_conf.set(key, dir) } job_conf.num_map_tasks = 1 job_conf.num_reduce_tasks = 1 properties = java.util.HashMap.new Java::CascadingFlowHadoopPlanner::HadoopPlanner.copy_job_conf(properties, job_conf) properties end |
#multi_sink_tap(*taps) ⇒ Object
Convenience access to MultiTap.multi_sink_tap. This constructor is more “DSL-like” because it allows you to pass taps directly as actual args rather than in an array:
multi_sink_tap tap1, tap2, tap3, ..., tapn
See MultiTap.multi_sink_tap for more details.
249 250 251 |
# File 'lib/cascading/cascading.rb', line 249 def multi_sink_tap(*taps) MultiTap.multi_sink_tap(taps) end |
#multi_source_tap(*taps) ⇒ Object
Convenience access to MultiTap.multi_source_tap. This constructor is more “DSL-like” because it allows you to pass taps directly as actual args rather than in an array:
multi_source_tap tap1, tap2, tap3, ..., tapn
See MultiTap.multi_source_tap for more details.
239 240 241 |
# File 'lib/cascading/cascading.rb', line 239 def multi_source_tap(*taps) MultiTap.multi_source_tap(taps) end |
#sequence_file_scheme(*fields) ⇒ Object
Creates a c.s.h.SequenceFile scheme instance from the specified fields. A local SequenceFile scheme is not provided by Cascading, so this scheme cannot be used in Cascading local mode.
226 227 228 229 230 231 |
# File 'lib/cascading/cascading.rb', line 226 def sequence_file_scheme(*fields) { :local_scheme => nil, :hadoop_scheme => Java::CascadingSchemeHadoop::SequenceFile.new(fields.empty? ? all_fields : fields(fields)), } end |
#tap(path, options = {}) ⇒ Object
Convenience constructor for a Tap, that accepts the same options as that class’ constructor. See Tap for more details.
255 256 257 |
# File 'lib/cascading/cascading.rb', line 255 def tap(path, = {}) Tap.new(path, ) end |
#text_line_scheme(*args_with_options) ⇒ Object
Creates a TextLine scheme (can be used in both Cascading local and hadoop modes). Positional args are used if :source_fields is not provided.
The named options are:
- source_fields
-
Fields to be read from a source with this scheme. Defaults to [‘offset’, ‘line’].
- sink_fields
-
Fields to be written to a sink with this scheme. Defaults to all_fields.
- compression
-
A symbol, either :enable or :disable, that governs the TextLine scheme’s compression. Defaults to the default TextLine compression (only applies to c.s.h.TextLine).
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 |
# File 'lib/cascading/cascading.rb', line 207 def text_line_scheme(*) , source_fields = ., source_fields = fields([:source_fields] || (source_fields.empty? ? ['offset', 'line'] : source_fields)) sink_fields = fields([:sink_fields]) || all_fields sink_compression = case [:compression] when :enable then Java::CascadingSchemeHadoop::TextLine::Compress::ENABLE when :disable then Java::CascadingSchemeHadoop::TextLine::Compress::DISABLE else Java::CascadingSchemeHadoop::TextLine::Compress::DEFAULT end { :local_scheme => Java::CascadingSchemeLocal::TextLine.new(source_fields, sink_fields), :hadoop_scheme => Java::CascadingSchemeHadoop::TextLine.new(source_fields, sink_fields, sink_compression), } end |