Module: Cascading

Defined in:
lib/cascading.rb,
lib/cascading/tap.rb,
lib/cascading/base.rb,
lib/cascading/flow.rb,
lib/cascading/mode.rb,
lib/cascading/scope.rb,
lib/cascading/cascade.rb,
lib/cascading/assembly.rb,
lib/cascading/cascading.rb,
lib/cascading/expr_stub.rb,
lib/cascading/operations.rb,
lib/cascading/aggregations.rb,
lib/cascading/sub_assembly.rb,
lib/cascading/cascading_exception.rb

Defined Under Namespace

Modules: Operations, Registerable Classes: Aggregations, AmbiguousNodeNameException, Assembly, BaseTap, Cascade, CascadingException, ExprArgException, ExprStub, Flow, Mode, MultiTap, Node, Scope, SubAssembly, Tap

Constant Summary collapse

VERSION =

:stopdoc:

'0.0.10'
LIBPATH =
::File.expand_path(::File.dirname(__FILE__)) + ::File::SEPARATOR
PATH =
::File.dirname(LIBPATH) + ::File::SEPARATOR
CASCADING_HOME =
ENV['CASCADING_HOME']
HADOOP_HOME =
ENV['HADOOP_HOME']
JAVA_TYPE_MAP =
{
  :int => java.lang.Integer.java_class, :long => java.lang.Long.java_class,
  :bool => java.lang.Boolean.java_class, :double => java.lang.Double.java_class,
  :float => java.lang.Float.java_class, :string => java.lang.String.java_class,
}

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.libpath(*args) ⇒ Object

Returns the library path for the module. If any arguments are given, they will be joined to the end of the libray path using File.join.



23
24
25
# File 'lib/cascading.rb', line 23

def self.libpath( *args )
  args.empty? ? LIBPATH : ::File.join(LIBPATH, args.flatten)
end

.path(*args) ⇒ Object

Returns the lpath for the module. If any arguments are given, they will be joined to the end of the path using File.join.



31
32
33
# File 'lib/cascading.rb', line 31

def self.path( *args )
  args.empty? ? PATH : ::File.join(PATH, args.flatten)
end

.require_all_jars(from = ::File.join(::File.dirname(__FILE__), "..", "jars")) ⇒ Object



35
36
37
38
39
40
41
# File 'lib/cascading.rb', line 35

def self.require_all_jars(from = ::File.join(::File.dirname(__FILE__), "..", "jars"))
  search_me = ::File.expand_path(
      ::File.join(from, '**', '*.jar'))
  Dir.glob(search_me).sort.each do |jar|
    require jar
  end
end

.versionObject

Returns the version string for the library.



15
16
17
# File 'lib/cascading.rb', line 15

def self.version
  VERSION
end

Instance Method Details

#all_fieldsObject



79
80
81
# File 'lib/cascading/cascading.rb', line 79

def all_fields
  Java::CascadingTuple::Fields::ALL
end

#cascade(name, params = {}, &block) ⇒ Object

Builds a top-level cascade given a name and a block. Optionally accepts a :mode, as explained in Cascading::Cascade#initialize.



29
30
31
32
33
34
35
36
37
38
# File 'lib/cascading/cascading.rb', line 29

def cascade(name, params = {}, &block)
  raise "Could not build cascade '#{name}'; block required" unless block_given?
  raise 'Cascading::cascade does not accept the :properties param only the global $jobconf_properties' if params[:properties]

  params[:properties] = $jobconf_properties.dup if $jobconf_properties

  cascade = Cascade.new(name, params)
  cascade.instance_eval(&block)
  cascade
end

#copy_fields(fields) ⇒ Object



91
92
93
# File 'lib/cascading/cascading.rb', line 91

def copy_fields(fields)
  fields.select(all_fields)
end

#dedup_field_names(*names) ⇒ Object



100
101
102
103
104
# File 'lib/cascading/cascading.rb', line 100

def dedup_field_names(*names)
  names.inject([]) do |acc, arr|
    acc + arr.map{ |e| search_field_name(acc, e) }
  end
end

#dedup_fields(*fields) ⇒ Object



95
96
97
98
# File 'lib/cascading/cascading.rb', line 95

def dedup_fields(*fields)
  raise 'Can only be applied to declarators' unless fields.all?{ |f| f.is_declarator? }
  fields(dedup_field_names(*fields.map{ |f| f.to_a }))
end

#describeObject Also known as: desc



54
55
56
# File 'lib/cascading/cascading.rb', line 54

def describe
  Cascade.all.map{ |cascade| cascade.describe }.join("\n")
end

#difference_fields(*fields) ⇒ Object



87
88
89
# File 'lib/cascading/cascading.rb', line 87

def difference_fields(*fields)
  fields(fields[1..-1].inject(fields.first.to_a){ |acc, arr| acc - arr.to_a })
end

#expr(expression, params = {}) ⇒ Object

See ExprStub.expr



60
61
62
# File 'lib/cascading/cascading.rb', line 60

def expr(expression, params = {})
  ExprStub.expr(expression, params)
end

#fields(fields) ⇒ Object

Creates a cascading.tuple.Fields instance from a string or an array of strings.



65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/cascading/cascading.rb', line 65

def fields(fields)
  if fields.nil?
    return nil
  elsif fields.is_a? Java::CascadingTuple::Fields
    return fields
  elsif fields.is_a? ::Array
    if fields.size == 1
      return fields(fields[0])
    end
    raise "Fields cannot be nil: #{fields.inspect}" if fields.include?(nil)
  end
  return Java::CascadingTuple::Fields.new([fields].flatten.map{ |f| f.kind_of?(Fixnum) ? java.lang.Integer.new(f) : f }.to_java(java.lang.Comparable))
end

#flow(name, params = {}, &block) ⇒ Object

Builds a top-level flow given a name and block for applications built of flows with no cascades. Optionally accepts a :mode, as explained in Cascading::Flow#initialize.



43
44
45
46
47
48
49
50
51
52
# File 'lib/cascading/cascading.rb', line 43

def flow(name, params = {}, &block)
  raise "Could not build flow '#{name}'; block required" unless block_given?
  raise 'Cascading::flow does not accept the :properties param only the global $jobconf_properties' if params[:properties]

  params[:properties] = $jobconf_properties.dup if $jobconf_properties

  flow = Flow.new(name, nil, params)
  flow.instance_eval(&block)
  flow
end

#last_grouping_fieldsObject



110
111
112
# File 'lib/cascading/cascading.rb', line 110

def last_grouping_fields
  Java::CascadingTuple::Fields::VALUES
end

#local_properties(base_dir) ⇒ Object

Constructs properties to be passed to Flow#complete or Cascade#complete which will locate temporary Hadoop files in base_dir. It is necessary to pass these properties only when executing scripts in Hadoop local mode via JRuby’s main method, which confuses Cascading’s attempt to find the containing jar. When using Cascading local mode, these are unnecessary.



174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/cascading/cascading.rb', line 174

def local_properties(base_dir)
  dirs = {
    'test.build.data' => "#{base_dir}/build",
    'hadoop.tmp.dir' => "#{base_dir}/tmp",
    'hadoop.log.dir' => "#{base_dir}/log",
  }
  dirs.each{ |key, dir| `mkdir -p #{dir}` }

  job_conf = Java::OrgApacheHadoopMapred::JobConf.new
  job_conf.jar = dirs['test.build.data']
  dirs.each{ |key, dir| job_conf.set(key, dir) }

  job_conf.num_map_tasks = 1
  job_conf.num_reduce_tasks = 1

  properties = java.util.HashMap.new
  Java::CascadingFlowHadoopPlanner::HadoopPlanner.copy_job_conf(properties, job_conf)
  properties
end

#multi_sink_tap(*taps) ⇒ Object



160
161
162
# File 'lib/cascading/cascading.rb', line 160

def multi_sink_tap(*taps)
  MultiTap.multi_sink_tap(taps)
end

#multi_source_tap(*taps) ⇒ Object



156
157
158
# File 'lib/cascading/cascading.rb', line 156

def multi_source_tap(*taps)
  MultiTap.multi_source_tap(taps)
end

#results_fieldsObject



114
115
116
# File 'lib/cascading/cascading.rb', line 114

def results_fields
  Java::CascadingTuple::Fields::RESULTS
end

#search_field_name(names, candidate) ⇒ Object



106
107
108
# File 'lib/cascading/cascading.rb', line 106

def search_field_name(names, candidate)
  names.include?(candidate) ? search_field_name(names, "#{candidate}_") : candidate
end

#sequence_file_scheme(*fields) ⇒ Object

Creates a c.s.h.SequenceFile scheme instance from the specified fields. A local SequenceFile scheme is not provided by Cascading, so this scheme cannot be used in Cascading local mode.



149
150
151
152
153
154
# File 'lib/cascading/cascading.rb', line 149

def sequence_file_scheme(*fields)
  {
    :local_scheme => nil,
    :hadoop_scheme => Java::CascadingSchemeHadoop::SequenceFile.new(fields.empty? ? all_fields : fields(fields)),
  }
end

#tap(path, params = {}) ⇒ Object

Creates a Cascading::Tap given a path and optional :scheme and :sink_mode.



165
166
167
# File 'lib/cascading/cascading.rb', line 165

def tap(path, params = {})
  Tap.new(path, params)
end

#text_line_scheme(*args) ⇒ Object

Creates a TextLine scheme (can be used in both Cascading local and hadoop modes). Positional args are used if :source_fields is not provided.

The named options are:

  • :source_fields a string or array of strings. Specifies the fields to be read from a source with this scheme. Defaults to [‘offset’, ‘line’].

  • :sink_fields a string or array of strings. Specifies the fields to be written to a sink with this scheme. Defaults to all_fields.

  • :compression a symbol, either :enable or :disable, that governs the TextLine scheme’s compression. Defaults to the default TextLine compression (only applies to c.s.h.TextLine).



130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# File 'lib/cascading/cascading.rb', line 130

def text_line_scheme(*args)
  options = args.extract_options!
  source_fields = fields(options[:source_fields] || (args.empty? ? ['offset', 'line'] : args))
  sink_fields = fields(options[:sink_fields]) || all_fields
  sink_compression = case options[:compression]
    when :enable  then Java::CascadingSchemeHadoop::TextLine::Compress::ENABLE
    when :disable then Java::CascadingSchemeHadoop::TextLine::Compress::DISABLE
    else Java::CascadingSchemeHadoop::TextLine::Compress::DEFAULT
  end

  {
    :local_scheme => Java::CascadingSchemeLocal::TextLine.new(source_fields, sink_fields),
    :hadoop_scheme => Java::CascadingSchemeHadoop::TextLine.new(source_fields, sink_fields, sink_compression),
  }
end

#union_fields(*fields) ⇒ Object



83
84
85
# File 'lib/cascading/cascading.rb', line 83

def union_fields(*fields)
  fields(fields.inject([]){ |acc, arr| acc | arr.to_a })
end