Module: Cascading
- Defined in:
- lib/cascading.rb,
lib/cascading/tap.rb,
lib/cascading/base.rb,
lib/cascading/flow.rb,
lib/cascading/mode.rb,
lib/cascading/scope.rb,
lib/cascading/cascade.rb,
lib/cascading/assembly.rb,
lib/cascading/cascading.rb,
lib/cascading/expr_stub.rb,
lib/cascading/operations.rb,
lib/cascading/aggregations.rb,
lib/cascading/sub_assembly.rb,
lib/cascading/cascading_exception.rb
Defined Under Namespace
Modules: Operations, Registerable Classes: Aggregations, AmbiguousNodeNameException, Assembly, BaseTap, Cascade, CascadingException, ExprArgException, ExprStub, Flow, Mode, MultiTap, Node, Scope, SubAssembly, Tap
Constant Summary collapse
- VERSION =
:stopdoc:
'0.0.10'
- LIBPATH =
::File.(::File.dirname(__FILE__)) + ::File::SEPARATOR
- PATH =
::File.dirname(LIBPATH) + ::File::SEPARATOR
- CASCADING_HOME =
ENV['CASCADING_HOME']
- HADOOP_HOME =
ENV['HADOOP_HOME']
- JAVA_TYPE_MAP =
{ :int => java.lang.Integer.java_class, :long => java.lang.Long.java_class, :bool => java.lang.Boolean.java_class, :double => java.lang.Double.java_class, :float => java.lang.Float.java_class, :string => java.lang.String.java_class, }
Class Method Summary collapse
-
.libpath(*args) ⇒ Object
Returns the library path for the module.
-
.path(*args) ⇒ Object
Returns the lpath for the module.
- .require_all_jars(from = ::File.join(::File.dirname(__FILE__), "..", "jars")) ⇒ Object
-
.version ⇒ Object
Returns the version string for the library.
Instance Method Summary collapse
- #all_fields ⇒ Object
-
#cascade(name, params = {}, &block) ⇒ Object
Builds a top-level cascade given a name and a block.
- #copy_fields(fields) ⇒ Object
- #dedup_field_names(*names) ⇒ Object
- #dedup_fields(*fields) ⇒ Object
- #describe ⇒ Object (also: #desc)
- #difference_fields(*fields) ⇒ Object
-
#expr(expression, params = {}) ⇒ Object
See ExprStub.expr.
-
#fields(fields) ⇒ Object
Creates a cascading.tuple.Fields instance from a string or an array of strings.
-
#flow(name, params = {}, &block) ⇒ Object
Builds a top-level flow given a name and block for applications built of flows with no cascades.
- #last_grouping_fields ⇒ Object
-
#local_properties(base_dir) ⇒ Object
Constructs properties to be passed to Flow#complete or Cascade#complete which will locate temporary Hadoop files in base_dir.
- #multi_sink_tap(*taps) ⇒ Object
- #multi_source_tap(*taps) ⇒ Object
- #results_fields ⇒ Object
- #search_field_name(names, candidate) ⇒ Object
-
#sequence_file_scheme(*fields) ⇒ Object
Creates a c.s.h.SequenceFile scheme instance from the specified fields.
-
#tap(path, params = {}) ⇒ Object
Creates a Cascading::Tap given a path and optional :scheme and :sink_mode.
-
#text_line_scheme(*args) ⇒ Object
Creates a TextLine scheme (can be used in both Cascading local and hadoop modes).
- #union_fields(*fields) ⇒ Object
Class Method Details
.libpath(*args) ⇒ Object
Returns the library path for the module. If any arguments are given, they will be joined to the end of the libray path using File.join
.
23 24 25 |
# File 'lib/cascading.rb', line 23 def self.libpath( *args ) args.empty? ? LIBPATH : ::File.join(LIBPATH, args.flatten) end |
.path(*args) ⇒ Object
Returns the lpath for the module. If any arguments are given, they will be joined to the end of the path using File.join
.
31 32 33 |
# File 'lib/cascading.rb', line 31 def self.path( *args ) args.empty? ? PATH : ::File.join(PATH, args.flatten) end |
.require_all_jars(from = ::File.join(::File.dirname(__FILE__), "..", "jars")) ⇒ Object
35 36 37 38 39 40 41 |
# File 'lib/cascading.rb', line 35 def self.require_all_jars(from = ::File.join(::File.dirname(__FILE__), "..", "jars")) search_me = ::File.( ::File.join(from, '**', '*.jar')) Dir.glob(search_me).sort.each do |jar| require jar end end |
.version ⇒ Object
Returns the version string for the library.
15 16 17 |
# File 'lib/cascading.rb', line 15 def self.version VERSION end |
Instance Method Details
#all_fields ⇒ Object
79 80 81 |
# File 'lib/cascading/cascading.rb', line 79 def all_fields Java::CascadingTuple::Fields::ALL end |
#cascade(name, params = {}, &block) ⇒ Object
Builds a top-level cascade given a name and a block. Optionally accepts a :mode, as explained in Cascading::Cascade#initialize.
29 30 31 32 33 34 35 36 37 38 |
# File 'lib/cascading/cascading.rb', line 29 def cascade(name, params = {}, &block) raise "Could not build cascade '#{name}'; block required" unless block_given? raise 'Cascading::cascade does not accept the :properties param only the global $jobconf_properties' if params[:properties] params[:properties] = $jobconf_properties.dup if $jobconf_properties cascade = Cascade.new(name, params) cascade.instance_eval(&block) cascade end |
#copy_fields(fields) ⇒ Object
91 92 93 |
# File 'lib/cascading/cascading.rb', line 91 def copy_fields(fields) fields.select(all_fields) end |
#dedup_field_names(*names) ⇒ Object
100 101 102 103 104 |
# File 'lib/cascading/cascading.rb', line 100 def dedup_field_names(*names) names.inject([]) do |acc, arr| acc + arr.map{ |e| search_field_name(acc, e) } end end |
#dedup_fields(*fields) ⇒ Object
95 96 97 98 |
# File 'lib/cascading/cascading.rb', line 95 def dedup_fields(*fields) raise 'Can only be applied to declarators' unless fields.all?{ |f| f.is_declarator? } fields(dedup_field_names(*fields.map{ |f| f.to_a })) end |
#describe ⇒ Object Also known as: desc
54 55 56 |
# File 'lib/cascading/cascading.rb', line 54 def describe Cascade.all.map{ |cascade| cascade.describe }.join("\n") end |
#difference_fields(*fields) ⇒ Object
87 88 89 |
# File 'lib/cascading/cascading.rb', line 87 def difference_fields(*fields) fields(fields[1..-1].inject(fields.first.to_a){ |acc, arr| acc - arr.to_a }) end |
#expr(expression, params = {}) ⇒ Object
See ExprStub.expr
60 61 62 |
# File 'lib/cascading/cascading.rb', line 60 def expr(expression, params = {}) ExprStub.expr(expression, params) end |
#fields(fields) ⇒ Object
Creates a cascading.tuple.Fields instance from a string or an array of strings.
65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/cascading/cascading.rb', line 65 def fields(fields) if fields.nil? return nil elsif fields.is_a? Java::CascadingTuple::Fields return fields elsif fields.is_a? ::Array if fields.size == 1 return fields(fields[0]) end raise "Fields cannot be nil: #{fields.inspect}" if fields.include?(nil) end return Java::CascadingTuple::Fields.new([fields].flatten.map{ |f| f.kind_of?(Fixnum) ? java.lang.Integer.new(f) : f }.to_java(java.lang.Comparable)) end |
#flow(name, params = {}, &block) ⇒ Object
Builds a top-level flow given a name and block for applications built of flows with no cascades. Optionally accepts a :mode, as explained in Cascading::Flow#initialize.
43 44 45 46 47 48 49 50 51 52 |
# File 'lib/cascading/cascading.rb', line 43 def flow(name, params = {}, &block) raise "Could not build flow '#{name}'; block required" unless block_given? raise 'Cascading::flow does not accept the :properties param only the global $jobconf_properties' if params[:properties] params[:properties] = $jobconf_properties.dup if $jobconf_properties flow = Flow.new(name, nil, params) flow.instance_eval(&block) flow end |
#last_grouping_fields ⇒ Object
110 111 112 |
# File 'lib/cascading/cascading.rb', line 110 def last_grouping_fields Java::CascadingTuple::Fields::VALUES end |
#local_properties(base_dir) ⇒ Object
Constructs properties to be passed to Flow#complete or Cascade#complete which will locate temporary Hadoop files in base_dir. It is necessary to pass these properties only when executing scripts in Hadoop local mode via JRuby’s main method, which confuses Cascading’s attempt to find the containing jar. When using Cascading local mode, these are unnecessary.
174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
# File 'lib/cascading/cascading.rb', line 174 def local_properties(base_dir) dirs = { 'test.build.data' => "#{base_dir}/build", 'hadoop.tmp.dir' => "#{base_dir}/tmp", 'hadoop.log.dir' => "#{base_dir}/log", } dirs.each{ |key, dir| `mkdir -p #{dir}` } job_conf = Java::OrgApacheHadoopMapred::JobConf.new job_conf.jar = dirs['test.build.data'] dirs.each{ |key, dir| job_conf.set(key, dir) } job_conf.num_map_tasks = 1 job_conf.num_reduce_tasks = 1 properties = java.util.HashMap.new Java::CascadingFlowHadoopPlanner::HadoopPlanner.copy_job_conf(properties, job_conf) properties end |
#multi_sink_tap(*taps) ⇒ Object
160 161 162 |
# File 'lib/cascading/cascading.rb', line 160 def multi_sink_tap(*taps) MultiTap.multi_sink_tap(taps) end |
#multi_source_tap(*taps) ⇒ Object
156 157 158 |
# File 'lib/cascading/cascading.rb', line 156 def multi_source_tap(*taps) MultiTap.multi_source_tap(taps) end |
#results_fields ⇒ Object
114 115 116 |
# File 'lib/cascading/cascading.rb', line 114 def results_fields Java::CascadingTuple::Fields::RESULTS end |
#search_field_name(names, candidate) ⇒ Object
106 107 108 |
# File 'lib/cascading/cascading.rb', line 106 def search_field_name(names, candidate) names.include?(candidate) ? search_field_name(names, "#{candidate}_") : candidate end |
#sequence_file_scheme(*fields) ⇒ Object
Creates a c.s.h.SequenceFile scheme instance from the specified fields. A local SequenceFile scheme is not provided by Cascading, so this scheme cannot be used in Cascading local mode.
149 150 151 152 153 154 |
# File 'lib/cascading/cascading.rb', line 149 def sequence_file_scheme(*fields) { :local_scheme => nil, :hadoop_scheme => Java::CascadingSchemeHadoop::SequenceFile.new(fields.empty? ? all_fields : fields(fields)), } end |
#tap(path, params = {}) ⇒ Object
Creates a Cascading::Tap given a path and optional :scheme and :sink_mode.
165 166 167 |
# File 'lib/cascading/cascading.rb', line 165 def tap(path, params = {}) Tap.new(path, params) end |
#text_line_scheme(*args) ⇒ Object
Creates a TextLine scheme (can be used in both Cascading local and hadoop modes). Positional args are used if :source_fields
is not provided.
The named options are:
-
:source_fields
a string or array of strings. Specifies the fields to be read from a source with this scheme. Defaults to [‘offset’, ‘line’]. -
:sink_fields
a string or array of strings. Specifies the fields to be written to a sink with this scheme. Defaults to all_fields. -
:compression
a symbol, either:enable
or:disable
, that governs the TextLine scheme’s compression. Defaults to the default TextLine compression (only applies to c.s.h.TextLine).
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
# File 'lib/cascading/cascading.rb', line 130 def text_line_scheme(*args) = args. source_fields = fields([:source_fields] || (args.empty? ? ['offset', 'line'] : args)) sink_fields = fields([:sink_fields]) || all_fields sink_compression = case [:compression] when :enable then Java::CascadingSchemeHadoop::TextLine::Compress::ENABLE when :disable then Java::CascadingSchemeHadoop::TextLine::Compress::DISABLE else Java::CascadingSchemeHadoop::TextLine::Compress::DEFAULT end { :local_scheme => Java::CascadingSchemeLocal::TextLine.new(source_fields, sink_fields), :hadoop_scheme => Java::CascadingSchemeHadoop::TextLine.new(source_fields, sink_fields, sink_compression), } end |