Module: Cascading

Defined in:
lib/cascading/cascading.rb,
lib/cascading.rb,
lib/cascading/tap.rb,
lib/cascading/base.rb,
lib/cascading/flow.rb,
lib/cascading/mode.rb,
lib/cascading/scope.rb,
lib/cascading/cascade.rb,
lib/cascading/assembly.rb,
lib/cascading/expr_stub.rb,
lib/cascading/operations.rb,
lib/cascading/aggregations.rb,
lib/cascading/sub_assembly.rb,
lib/cascading/text_operations.rb,
lib/cascading/regex_operations.rb,
lib/cascading/filter_operations.rb,
lib/cascading/cascading_exception.rb,
lib/cascading/identity_operations.rb

Overview

The Cascading module contains all of the cascading.jruby DSL. Inserting the following into your script:

require 'rubygems'
require 'cascading'

includes this module at the top level, making all of its features available.

To build a dataflow like the one in the README.md or samples, start by looking at Cascade or Flow. These are the highest level structures you’ll use to put together your job.

Within a flow, you’ll connect sources to sinks by way of Assembly, which refers to “pipe assemblies” from Cascading. Within an Assembly, you’ll use functions and filters (see Operations, IdentityOperations, RegexOperations, FilterOperations, and TextOperations) as well as Assembly#group_by, Assembly#union, and Assembly#join. You can provide those last pipes with a block that can select operations from Aggregations.

Finally, you’ll want to address the execution of your job, whether it be locally testing or running remotely on a Hadoop cluster. See the Mode class for the available modes, and parameterize your script such that it can operate in Cascading local mode locally and in Hadoop mode when run in a jar produced with Jading.

Defined Under Namespace

Modules: FilterOperations, IdentityOperations, Operations, RegexOperations, Registerable, TextOperations Classes: Aggregations, AmbiguousNodeNameException, Assembly, BaseTap, Cascade, CascadingException, ExprArgException, ExprStub, Flow, Mode, MultiTap, Node, Scope, SubAssembly, Tap

Constant Summary collapse

VERSION =

:stopdoc:

'1.0.0'
JAVA_TYPE_MAP =

Mapping that defines a convenient syntax for specifying Java classes, used in Janino expressions and elsewhere.

{
  :int => java.lang.Integer.java_class, :long => java.lang.Long.java_class,
  :bool => java.lang.Boolean.java_class, :double => java.lang.Double.java_class,
  :float => java.lang.Float.java_class, :string => java.lang.String.java_class,
}

Instance Method Summary collapse

Instance Method Details

#all_fieldsObject

Convenience method wrapping c.t.Fields::ALL



147
148
149
# File 'lib/cascading/cascading.rb', line 147

def all_fields
  Java::CascadingTuple::Fields::ALL
end

#cascade(name, options = {}, &block) ⇒ Object

Builds a top-level Cascade given a name and a block.

The named options are:

properties

See Cascade#initialize

mode

See Cascade#initialize

Example:

cascade 'wordcount', :mode => :local do
  flow 'first_step' do
    ...
  end

  flow 'second_step' do
    ...
  end
end


70
71
72
73
74
75
76
77
78
79
# File 'lib/cascading/cascading.rb', line 70

def cascade(name, options = {}, &block)
  raise "Could not build cascade '#{name}'; block required" unless block_given?
  raise 'Cascading::cascade does not accept the :properties param only the global $jobconf_properties' if options[:properties]

  options[:properties] = $jobconf_properties.dup if defined?($jobconf_properties) && $jobconf_properties

  cascade = Cascade.new(name, options)
  cascade.instance_eval(&block)
  cascade
end

#dedup_field_names(*names) ⇒ Object

Helper used by dedup_fields that operates on arrays of field names rather than fields objects.

Example:

left_names = ['a', 'b']
mid_names = ['a', 'c']
right_names = ['a', 'd']
deduped_names = dedup_field_names(left_names, mid_names, right_names)
# deduped_names == ['a', 'b', 'a_', 'c', 'a__', 'd']


185
186
187
188
189
# File 'lib/cascading/cascading.rb', line 185

def dedup_field_names(*names)
  names.inject([]) do |acc, arr|
    acc + arr.map{ |e| search_field_name(acc, e) }
  end
end

#dedup_fields(*fields) ⇒ Object

Combines fields deduplicating them with trailing underscores as necessary. This is used in joins to avoid requiring the caller to unique fields before they are joined.



171
172
173
174
# File 'lib/cascading/cascading.rb', line 171

def dedup_fields(*fields)
  raise 'Can only be applied to declarators' unless fields.all?{ |f| f.is_declarator? }
  fields(dedup_field_names(*fields.map{ |f| f.to_a }))
end

#describeObject Also known as: desc

Produces a textual description of all Cascades in the global registry. The description details the structure of the Cascades, the sources and sinks of each Flow, and the input and output fields of each Assembly.

NOTE: will be moved to Job in later version



114
115
116
# File 'lib/cascading/cascading.rb', line 114

def describe
  Cascade.all.map{ |cascade| cascade.describe }.join("\n")
end

#difference_fields(base_fields, remove_fields) ⇒ Object

Computes fields formed by removing remove_fields from base_fields. Operates only on named fields, not positional fields.

Example:

base_fields = fields(['a', 'b', 'c'])
remove_fields = fields(['b'])
result_fields = difference_fields(base_fields, remove_fields)
# results_fields.to_a == ['a', 'c']


164
165
166
# File 'lib/cascading/cascading.rb', line 164

def difference_fields(base_fields, remove_fields)
  fields(base_fields.to_a - remove_fields.to_a)
end

#expr(expression, options = {}) ⇒ Object

See ExprStub.expr



120
121
122
# File 'lib/cascading/cascading.rb', line 120

def expr(expression, options = {})
  ExprStub.expr(expression, options)
end

#fields(fields) ⇒ Object

Utility method for creating Cascading c.t.Fields from a field name (string) or list of field names (array of strings). If the input fields is already a c.t.Fields or nil, it is passed through. This allows for flexible use of the method at multiple layers in the DSL.

Example:

cascading_fields = fields(['first', 'second', 'third'])
# cascading_fields.to_a == ['first', 'second', 'third']


132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/cascading/cascading.rb', line 132

def fields(fields)
  if fields.nil?
    return nil
  elsif fields.is_a? Java::CascadingTuple::Fields
    return fields
  elsif fields.is_a? ::Array
    if fields.size == 1
      return fields(fields[0])
    end
    raise "Fields cannot be nil: #{fields.inspect}" if fields.include?(nil)
  end
  return Java::CascadingTuple::Fields.new([fields].flatten.map{ |f| f.kind_of?(Fixnum) ? java.lang.Integer.new(f) : f }.to_java(java.lang.Comparable))
end

#flow(name, options = {}, &block) ⇒ Object

Builds a top-level Flow given a name and block for applications built of flows with no cascades.

The named options are:

properties

See Flow#initialize

mode

See Flow#initialize

Example:

flow 'wordcount', :mode => :local do
  assembly 'first_step' do
    ...
  end

  assembly 'second_step' do
    ...
  end
end


98
99
100
101
102
103
104
105
106
107
# File 'lib/cascading/cascading.rb', line 98

def flow(name, options = {}, &block)
  raise "Could not build flow '#{name}'; block required" unless block_given?
  raise 'Cascading::flow does not accept the :properties param only the global $jobconf_properties' if options[:properties]

  options[:properties] = $jobconf_properties.dup if defined?($jobconf_properties) && $jobconf_properties

  flow = Flow.new(name, nil, options)
  flow.instance_eval(&block)
  flow
end

#last_grouping_fieldsObject

Convenience method wrapping c.t.Fields::VALUES



152
153
154
# File 'lib/cascading/cascading.rb', line 152

def last_grouping_fields
  Java::CascadingTuple::Fields::VALUES
end

#local_properties(base_dir) ⇒ Object

Constructs properties to be passed to Flow#complete or Cascade#complete which will locate temporary Hadoop files in base_dir. It is necessary to pass these properties only when executing scripts in Hadoop local mode via JRuby’s main method, which confuses Cascading’s attempt to find the containing jar. When using Cascading local mode, these are unnecessary.



264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# File 'lib/cascading/cascading.rb', line 264

def local_properties(base_dir)
  dirs = {
    'test.build.data' => "#{base_dir}/build",
    'hadoop.tmp.dir' => "#{base_dir}/tmp",
    'hadoop.log.dir' => "#{base_dir}/log",
  }
  dirs.each{ |key, dir| `mkdir -p #{dir}` }

  job_conf = Java::OrgApacheHadoopMapred::JobConf.new
  job_conf.jar = dirs['test.build.data']
  dirs.each{ |key, dir| job_conf.set(key, dir) }

  job_conf.num_map_tasks = 1
  job_conf.num_reduce_tasks = 1

  properties = java.util.HashMap.new
  Java::CascadingFlowHadoopPlanner::HadoopPlanner.copy_job_conf(properties, job_conf)
  properties
end

#multi_sink_tap(*taps) ⇒ Object

Convenience access to MultiTap.multi_sink_tap. This constructor is more “DSL-like” because it allows you to pass taps directly as actual args rather than in an array:

multi_sink_tap tap1, tap2, tap3, ..., tapn

See MultiTap.multi_sink_tap for more details.



249
250
251
# File 'lib/cascading/cascading.rb', line 249

def multi_sink_tap(*taps)
  MultiTap.multi_sink_tap(taps)
end

#multi_source_tap(*taps) ⇒ Object

Convenience access to MultiTap.multi_source_tap. This constructor is more “DSL-like” because it allows you to pass taps directly as actual args rather than in an array:

multi_source_tap tap1, tap2, tap3, ..., tapn

See MultiTap.multi_source_tap for more details.



239
240
241
# File 'lib/cascading/cascading.rb', line 239

def multi_source_tap(*taps)
  MultiTap.multi_source_tap(taps)
end

#sequence_file_scheme(*fields) ⇒ Object

Creates a c.s.h.SequenceFile scheme instance from the specified fields. A local SequenceFile scheme is not provided by Cascading, so this scheme cannot be used in Cascading local mode.



226
227
228
229
230
231
# File 'lib/cascading/cascading.rb', line 226

def sequence_file_scheme(*fields)
  {
    :local_scheme => nil,
    :hadoop_scheme => Java::CascadingSchemeHadoop::SequenceFile.new(fields.empty? ? all_fields : fields(fields)),
  }
end

#tap(path, options = {}) ⇒ Object

Convenience constructor for a Tap, that accepts the same options as that class’ constructor. See Tap for more details.



255
256
257
# File 'lib/cascading/cascading.rb', line 255

def tap(path, options = {})
  Tap.new(path, options)
end

#text_line_scheme(*args_with_options) ⇒ Object

Creates a TextLine scheme (can be used in both Cascading local and hadoop modes). Positional args are used if :source_fields is not provided.

The named options are:

source_fields

Fields to be read from a source with this scheme. Defaults to [‘offset’, ‘line’].

sink_fields

Fields to be written to a sink with this scheme. Defaults to all_fields.

compression

A symbol, either :enable or :disable, that governs the TextLine scheme’s compression. Defaults to the default TextLine compression (only applies to c.s.h.TextLine).



207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/cascading/cascading.rb', line 207

def text_line_scheme(*args_with_options)
  options, source_fields = args_with_options.extract_options!, args_with_options
  source_fields = fields(options[:source_fields] || (source_fields.empty? ? ['offset', 'line'] : source_fields))
  sink_fields = fields(options[:sink_fields]) || all_fields
  sink_compression = case options[:compression]
    when :enable  then Java::CascadingSchemeHadoop::TextLine::Compress::ENABLE
    when :disable then Java::CascadingSchemeHadoop::TextLine::Compress::DISABLE
    else Java::CascadingSchemeHadoop::TextLine::Compress::DEFAULT
  end

  {
    :local_scheme => Java::CascadingSchemeLocal::TextLine.new(source_fields, sink_fields),
    :hadoop_scheme => Java::CascadingSchemeHadoop::TextLine.new(source_fields, sink_fields, sink_compression),
  }
end