Class: Cascading::Assembly

Inherits:
Node
  • Object
show all
Includes:
Operations
Defined in:
lib/cascading/assembly.rb

Instance Attribute Summary collapse

Attributes inherited from Node

#child_names, #children, #last_child, #name, #parent

Instance Method Summary collapse

Methods included from Operations

#aggregator_function, #coerce_to_java, #date_formatter, #date_parser, #expression_filter, #expression_function, #field_joiner, #first_function, #identity, #insert_function, #last_function, #max_function, #min_function, #regex_filter, #regex_generator, #regex_parser, #regex_replace, #regex_split_generator, #regex_splitter, #to_java_comparable_array

Methods inherited from Node

#add_child, #find_child, #qualified_name, #root

Constructor Details

#initialize(name, parent, outgoing_scopes = {}) ⇒ Assembly

Returns a new instance of Assembly.



13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# File 'lib/cascading/assembly.rb', line 13

def initialize(name, parent, outgoing_scopes = {})
  super(name, parent)

  @outgoing_scopes = outgoing_scopes
  if parent.kind_of?(Assembly)
    @head_pipe = Java::CascadingPipe::Pipe.new(name, parent.tail_pipe)
    # Copy to allow destructive update of name
    @outgoing_scopes[name] = parent.scope.copy
    scope.scope.name = name
  else # Parent is a Flow
    @head_pipe = Java::CascadingPipe::Pipe.new(name)
    @outgoing_scopes[name] ||= Scope.empty_scope(name)
  end
  @tail_pipe = head_pipe
  @incoming_scopes = [scope]
end

Instance Attribute Details

#head_pipeObject (readonly)

Returns the value of attribute head_pipe.



11
12
13
# File 'lib/cascading/assembly.rb', line 11

def head_pipe
  @head_pipe
end

#tail_pipeObject (readonly)

Returns the value of attribute tail_pipe.



11
12
13
# File 'lib/cascading/assembly.rb', line 11

def tail_pipe
  @tail_pipe
end

Instance Method Details

#assert(*args) ⇒ Object



362
363
364
365
366
367
368
369
# File 'lib/cascading/assembly.rb', line 362

def assert(*args)
  options = args.extract_options!
  assertion = args[0]
  assertion_level = options[:level] || Java::CascadingOperation::AssertionLevel::STRICT

  parameters = [tail_pipe, assertion_level, assertion]
  make_pipe(Java::CascadingPipe::Each, parameters)
end

#assert_not_null(*args) ⇒ Object

Builds a pipe that assert the none of the fields in the tuple are null.



399
400
401
402
403
# File 'lib/cascading/assembly.rb', line 399

def assert_not_null(*args)
  options = args.extract_options!
  assertion = Java::CascadingOperationAssertion::AssertNotNull.new
  assert(assertion, options)
end

#assert_size_equals(*args) ⇒ Object

Builds a pipe that assert the size of the tuple is the size specified in parameter.

The method accept an unique uname argument : a number indicating the size expected.



392
393
394
395
396
# File 'lib/cascading/assembly.rb', line 392

def assert_size_equals(*args)
  options = args.extract_options!
  assertion = Java::CascadingOperationAssertion::AssertSizeEquals.new(args[0])
  assert(assertion, options)
end

#branch(name, &block) ⇒ Object

Builds a new branch.



231
232
233
234
235
236
237
# File 'lib/cascading/assembly.rb', line 231

def branch(name, &block)
  raise "Could not build branch '#{name}'; block required" unless block_given?
  assembly = Assembly.new(name, self, @outgoing_scopes)
  add_child(assembly)
  assembly.instance_eval(&block)
  assembly
end

#cast(type_map) ⇒ Object



342
343
344
345
346
347
348
# File 'lib/cascading/assembly.rb', line 342

def cast(type_map)
  names = type_map.keys.sort
  types = JAVA_TYPE_MAP.values_at(*type_map.values_at(*names))
  fields = fields(names)
  types = types.to_java(java.lang.Class)
  each fields, :function => Java::CascadingOperation::Identity.new(fields, types)
end

#copy(*args) ⇒ Object



350
351
352
353
354
355
# File 'lib/cascading/assembly.rb', line 350

def copy(*args)
  options = args.extract_options!
  from = args[0] || all_fields
  into = args[1] || options[:into] || all_fields
  each fields(from), :function => Java::CascadingOperation::Identity.new(fields(into)), :output => all_fields
end

#debug(*args) ⇒ Object

Builds a debugging pipe.

Without arguments, it generate a simple debug pipe, that prints all tuple to the standard output.

The other named options are:

  • :print_fields a boolean. If is set to true, then it prints every 10 tuples.



379
380
381
382
383
384
385
386
387
# File 'lib/cascading/assembly.rb', line 379

def debug(*args)
  options = args.extract_options!
  print_fields = options[:print_fields] || true
  parameters = [print_fields].compact
  debug = Java::CascadingOperation::Debug.new(*parameters)
  debug.print_tuple_every = options[:tuple_interval] || 1
  debug.print_fields_every = options[:fields_interval] || 10
  each(all_fields, :filter => debug)
end

#debug_scopeObject



47
48
49
# File 'lib/cascading/assembly.rb', line 47

def debug_scope
  puts "Current scope for '#{name}':\n  #{scope}\n----------\n"
end

#describe(offset = '') ⇒ Object



30
31
32
33
34
35
36
# File 'lib/cascading/assembly.rb', line 30

def describe(offset = '')
  incoming_scopes_desc = "#{@incoming_scopes.map{ |incoming_scope| incoming_scope.values_fields.to_a.inspect }.join(', ')}"
  incoming_scopes_desc = "(#{incoming_scopes_desc})" unless @incoming_scopes.size == 1
  description =  "#{offset}#{name}:assembly :: #{incoming_scopes_desc} -> #{scope.values_fields.to_a.inspect}"
  description += "\n#{child_names.map{ |child| children[child].describe("#{offset}  ") }.join("\n")}" unless children.empty?
  description
end

#discard(*args) ⇒ Object

Removes the specified fields from the current assembly. – Example:

discard "field1", "field2"


323
324
325
326
327
# File 'lib/cascading/assembly.rb', line 323

def discard(*args)
  discard_fields = fields(args)
  keep_fields = difference_fields(scope.values_fields, discard_fields)
  project(*keep_fields.to_a)
end

#distinct(*args) ⇒ Object

Builds a pipe that returns distinct tuples based on the provided fields.

The method accepts optional unamed argument specifying the fields to base the distinct on (all fields, by default).



667
668
669
670
671
672
# File 'lib/cascading/assembly.rb', line 667

def distinct(*args)
  raise "Distinct is badly broken"
  fields = args[0] || all_fields
  group_by *fields
  pass
end

#each(*args) ⇒ Object

Builds a basic each pipe, and adds it to the current assembly. – Example:

each 'line', :function => regex_splitter(['name', 'val1', 'val2', 'id'], :pattern => /[.,]*\s+/), :output => ['id', 'name', 'val1', 'val2']


294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# File 'lib/cascading/assembly.rb', line 294

def each(*args)
  options = args.extract_options!

  in_fields = fields(args)
  out_fields = fields(options[:output])

  operation = options[:filter] || options[:function]
  raise 'c.p.Each does not support applying an output selector to a c.o.Filter' if options[:filter] && options[:output]

  parameters = [tail_pipe, in_fields, operation, out_fields].compact
  each = make_pipe(Java::CascadingPipe::Each, parameters)
  raise ':function specified but c.o.Filter provided' if options[:function] && each.is_filter
  raise ':filter specified but c.o.Function provided' if options[:filter] && each.is_function

  each
end

#eval_expression(*args) ⇒ Object

Builds a pipe that evaluates the specified Janino expression and insert it in a new field in the tuple.

The named options are:

  • :from a string or array of strings. Specifies the input fields.

  • :express a string. The janino expression.

  • :into a string. Specified the name of the field to insert with the result of the evaluation.

  • :parameters a hash. Specifies the type mapping for the parameters. See Cascading::Operations.expression_function.



651
652
653
654
655
656
657
658
659
660
661
# File 'lib/cascading/assembly.rb', line 651

def eval_expression(*args)
  options = args.extract_options!

  into = options.delete(:into)
  from = options.delete(:from) || all_fields
  output = options.delete(:output) || all_fields
  options[:expression] ||= args.shift
  options[:parameters] ||= args.shift

  each from, :function => expression_function(into, options), :output=>output
end

#filter(*args) ⇒ Object

Builds a pipe that filters the tuples based on an expression or a pattern (but not both !).

The first unamed argument, if provided, is a filtering expression (using the Janino syntax).

The named options are:

  • :pattern a string. Specifies a regular expression pattern used to filter the tuples. If this

option is provided, then the filter is regular expression-based. This is incompatible with the expression option.

  • :expression a string. Specifies a Janino expression used to filter the tuples. This option has the

same effect than providing it as first unamed argument. If this option is provided, then the filter is Janino expression-based. This is incompatible with the pattern option.

  • :validate a boolean. Passed into Cascading#expr to enable or disable

expression validation. Defaults to true.

  • :validate_with a hash. Actual arguments used by Cascading#expr for

expression validation. Defaults to {}.



564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
# File 'lib/cascading/assembly.rb', line 564

def filter(*args)
  options = args.extract_options!
  from = options.delete(:from) || all_fields
  expression = options.delete(:expression) || args.shift
  regex = options.delete(:pattern)
  validate = options.has_key?(:validate) ? options.delete(:validate) : true
  validate_with = options.has_key?(:validate_with) ? options.delete(:validate_with) : {}

  if expression
    stub = expr(expression, { :validate => validate, :validate_with => validate_with })
    types, expression = stub.types, stub.expression

    stub.validate_scope(scope)
    each from, :filter => expression_filter(
      :parameters => types,
      :expression => expression
    )
  elsif regex
    each from, :filter => regex_filter(regex, options)
  end
end

#filter_not_null(*args) ⇒ Object Also known as: where_null



592
593
594
595
# File 'lib/cascading/assembly.rb', line 592

def filter_not_null(*args)
  options = args.extract_options!
  each(args, :filter => Java::CascadingOperationFilter::FilterNotNull.new)
end

#filter_null(*args) ⇒ Object Also known as: reject_null



586
587
588
589
# File 'lib/cascading/assembly.rb', line 586

def filter_null(*args)
  options = args.extract_options!
  each(args, :filter => Java::CascadingOperationFilter::FilterNull.new)
end

#format_date(*args) ⇒ Object

Builds a pipe that format a date using a specified format pattern.

The unamed argument specifies the field to format.

The named options are:

  • :into a string. It specifies the receiving field. By default, it will be named after

the input argument.

  • :pattern a string. Specifies the date format.

  • :timezone a string. Specifies the timezone (defaults to UTC).

  • :output a string or array of strings. Specifies the outgoing fields (all fields will be output by default)



497
498
499
500
501
502
503
504
# File 'lib/cascading/assembly.rb', line 497

def format_date(*args)
  options = args.extract_options!
  field = options[:into] || "#{args[0]}_formatted"
  pattern = options[:pattern] || "yyyy/MM/dd"
  output = options[:output] || all_fields

  each args[0], :function => date_formatter(field, pattern, options[:timezone]), :output => output
end

#group_by(*args, &block) ⇒ Object

Builds a new GroupBy pipe that groups on the fields given in args. Any block passed to this method should contain only Everies.



241
242
243
244
245
246
247
248
249
# File 'lib/cascading/assembly.rb', line 241

def group_by(*args, &block)
  options = args.extract_options!
  group_fields = fields(args)
  sort_fields = fields(options[:sort_by])
  reverse = options[:reverse]

  parameters = [tail_pipe, group_fields, sort_fields, reverse].compact
  apply_aggregations(Java::CascadingPipe::GroupBy.new(*parameters), [scope], &block)
end

#hash_join(*args, &block) ⇒ Object

Builds a HashJoin pipe. This should be used carefully, as the right side of the join is accumulated entirely in memory. Requires a list of assembly names to join and :on to specify the join_fields.



185
186
187
188
189
190
# File 'lib/cascading/assembly.rb', line 185

def hash_join(*args, &block)
  options = args.extract_options!
  options[:hash] = true
  args << options
  prepare_join(*args, &block)
end

#inner_join(*args, &block) ⇒ Object



202
203
204
205
206
207
# File 'lib/cascading/assembly.rb', line 202

def inner_join(*args, &block)
  options = args.extract_options!
  options[:joiner] = :inner
  args << options
  join(*args, &block)
end

#insert(args) ⇒ Object

Builds a pipe that inserts values into the current tuple.

The method takes a hash as parameter. This hash contains as keys the names of the fields to insert and as values, the values they must contain. For example:

insert {"who" => "Grégoire", "when" => Time.now.strftime("%Y-%m-%d") }

will insert two new fields: a field who containing the string “Grégoire”, and a field when containing the formatted current date. The methods outputs all fields. The named options are:



537
538
539
540
541
542
543
544
545
546
547
548
# File 'lib/cascading/assembly.rb', line 537

def insert(args)
  args.keys.sort.each do |field_name|
    value = args[field_name]

    if value.kind_of?(ExprStub)
      value.validate_scope(scope)
      each all_fields, :function => expression_function(field_name, :expression => value.expression, :parameters => value.types), :output => all_fields
    else
      each all_fields, :function => insert_function([field_name], :values => [value]), :output => all_fields
    end
  end
end

#join(*args, &block) ⇒ Object Also known as: co_group

Builds a join (CoGroup) pipe. Requires a list of assembly names to join and :on to specify the group_fields.



194
195
196
197
198
199
# File 'lib/cascading/assembly.rb', line 194

def join(*args, &block)
  options = args.extract_options!
  options[:hash] = false
  args << options
  prepare_join(*args, &block)
end

#join_fields(*args) ⇒ Object



674
675
676
677
678
679
# File 'lib/cascading/assembly.rb', line 674

def join_fields(*args)
  options = args.extract_options!
  output = options[:output] || all_fields

  each args, :function => field_joiner(options), :output => output
end

#left_join(*args, &block) ⇒ Object



209
210
211
212
213
214
# File 'lib/cascading/assembly.rb', line 209

def left_join(*args, &block)
  options = args.extract_options!
  options[:joiner] = :left
  args << options
  join(*args, &block)
end

#match_rows(*args) ⇒ Object

Builds a pipe that emits a new row for each regex group matched in a field, using a specified regular expression.

The first unnamed argument is the field to be matched against. The second unnamed argument is the field receiving the result of the match.

The named options are:

  • :pattern a string or regex. Specifies the regular expression used for matching the argument fields.

  • :output a string or array of strings. Specifies the outgoing fields (all fields will be output by default)



462
463
464
465
466
467
468
# File 'lib/cascading/assembly.rb', line 462

def match_rows(*args)
  options = args.extract_options!
  fields = options[:into] || args[1]
  pattern = options[:pattern] || /[\w]+/
  output = options[:output] || all_fields
  each(args[0], :function => regex_generator(fields, :pattern => pattern), :output=>output)
end

#null_indicator(input, params = {}) ⇒ Object

Efficient way of inserting a null indicator for any field, even one that cannot be coerced to a string. This is accomplished using Cascading’s FilterNull and SetValue operators rather than Janino. 1 is produced if the field is null and 0 otherwise.

Parameters:

  • input name of field to check for null.

The named options are:

  • :into an output field name, defaulting to ‘is_null’.

  • :output an array of field names that specifies the fields to retain in the output tuple. Defaults to all_fields.



749
750
751
752
753
# File 'lib/cascading/assembly.rb', line 749

def null_indicator(input, params = {})
  into = fields(params[:into] || 'is_null')
  output = params[:output] || all_fields
  set_value input, Java::CascadingOperationFilter::FilterNull.new, 1.to_java, 0.to_java, :into => into, :output => output
end

#outer_join(*args, &block) ⇒ Object



223
224
225
226
227
228
# File 'lib/cascading/assembly.rb', line 223

def outer_join(*args, &block)
  options = args.extract_options!
  options[:joiner] = :outer
  args << options
  join(*args, &block)
end

#parent_flowObject



38
39
40
41
# File 'lib/cascading/assembly.rb', line 38

def parent_flow
  return parent if parent.kind_of?(Flow)
  parent.parent_flow
end

#parse(*args) ⇒ Object

Builds a parse pipe. This pipe will parse the fields specified in input (first unamed arguments), using a specified regex pattern.

If provided, the unamed arguments must be the fields to be parsed. If not provided, then all incoming fields are used.

The named options are:

  • :pattern a string or regex. Specifies the regular expression used for parsing the argument fields.

  • :output a string or array of strings. Specifies the outgoing fields (all fields will be output by default)



414
415
416
417
418
419
420
# File 'lib/cascading/assembly.rb', line 414

def parse(*args)
    options = args.extract_options!
    fields = args || all_fields
    pattern = options[:pattern]
    output = options[:output] || all_fields
    each(fields, :function => regex_parser(pattern, options), :output => output)
end

#parse_date(*args) ⇒ Object

Builds a pipe that parses the specified field as a date using hte provided format string. The unamed argument specifies the field to format.

The named options are:

  • :into a string. It specifies the receiving field. By default, it will be named after

the input argument.

  • :pattern a string. Specifies the date format.

  • :output a string or array of strings. Specifies the outgoing fields (all fields will be output by default)



478
479
480
481
482
483
484
485
# File 'lib/cascading/assembly.rb', line 478

def parse_date(*args)
  options = args.extract_options!
  field = options[:into] || "#{args[0]}_parsed"
  output = options[:output] || all_fields
  pattern = options[:pattern] || "yyyy/MM/dd"

  each args[0], :function => date_parser(field, pattern), :output => output
end

#pass(*args) ⇒ Object

A pipe that does nothing.



358
359
360
# File 'lib/cascading/assembly.rb', line 358

def pass(*args)
  each all_fields, :function => Java::CascadingOperation::Identity.new
end

#project(*args) ⇒ Object

Restricts the current assembly to the specified fields. – Example:

project "field1", "field2"


315
316
317
# File 'lib/cascading/assembly.rb', line 315

def project(*args)
  each fields(args), :function => Java::CascadingOperation::Identity.new
end

#regex_contains(input, pattern, params = {}) ⇒ Object

Given a field and a regex, returns an indicator that is 1 if the string contains at least 1 match and 0 otherwise.

Parameters:

  • input field name or names that specifies the fields over which to perform the match.

  • pattern regex to apply to the input.

The named options are:

  • :into an output field name, defaulting to ‘regex_contains’.

  • :output an array of field names that specifies the fields to retain in the output tuple. Defaults to all_fields.



767
768
769
770
771
772
773
# File 'lib/cascading/assembly.rb', line 767

def regex_contains(input, pattern, params = {})
  input = fields(input)
  pattern = pattern.to_s # Supports JRuby regexes
  into = fields(params[:into] || 'regex_contains')
  output = params[:output] || all_fields
  set_value input, Java::CascadingOperationRegex::RegexFilter.new(pattern), 1.to_java, 0.to_java, :into => into, :output => output
end

#reject(*args) ⇒ Object

Builds a pipe that rejects the tuples based on an expression.

The first unamed argument, if provided, is a filtering expression (using the Janino syntax).

The named options are:

  • :expression a string. Specifies a Janino expression used to filter the tuples. This option has the

same effect than providing it as first unamed argument. If this option is provided, then the filter is Janino expression-based.

  • :validate a boolean. Passed into Cascading#expr to enable or disable

expression validation. Defaults to true.

  • :validate_with a hash. Actual arguments used by Cascading#expr for

expression validation. Defaults to {}.



610
611
612
613
614
615
# File 'lib/cascading/assembly.rb', line 610

def reject(*args)
  options = args.extract_options
  raise "Regex not allowed" if options && options[:pattern]

  filter(*args)
end

#rename(name_map) ⇒ Object

Renames fields according to the mapping provided. – Example:

rename "old_name" => "new_name"


333
334
335
336
337
338
339
340
# File 'lib/cascading/assembly.rb', line 333

def rename(name_map)
  old_names = scope.values_fields.to_a
  new_names = old_names.map{ |name| name_map[name] || name }
  invalid = name_map.keys.sort - old_names
  raise "invalid names: #{invalid.inspect}" unless invalid.empty?

  each all_fields, :function => Java::CascadingOperation::Identity.new(fields(new_names))
end

#replace(*args) ⇒ Object

Builds a pipe that perform a query/replace based on a regular expression.

The first unamed argument specifies the input field.

The named options are:

  • :pattern a string or regex. Specifies the pattern to look for in the input field. This non-optional argument

can also be specified as a second unamed argument.

  • :replacement a string. Specifies the replacement.

  • :output a string or array of strings. Specifies the outgoing fields (all fields will be output by default)



515
516
517
518
519
520
521
522
523
524
# File 'lib/cascading/assembly.rb', line 515

def replace(*args)
  options = args.extract_options!

  pattern = options[:pattern] || args[1]
  replacement = options[:replacement] || args[2]
  into = options[:into] || "#{args[0]}_replaced"
  output = options[:output] || all_fields

  each args[0], :function => regex_replace(into, pattern, replacement), :output => output
end

#right_join(*args, &block) ⇒ Object



216
217
218
219
220
221
# File 'lib/cascading/assembly.rb', line 216

def right_join(*args, &block)
  options = args.extract_options!
  options[:joiner] = :right
  args << options
  join(*args, &block)
end

#scopeObject



43
44
45
# File 'lib/cascading/assembly.rb', line 43

def scope
  @outgoing_scopes[name]
end

#set_value(input, filter, keep_value, remove_value, params = {}) ⇒ Object

Inserts one of two values into the dataflow based upon the result of the supplied filter on the input fields. This is primarily useful for creating indicators from filters.

Parameters:

  • input name of field to apply the filter.

  • filter Cascading Filter to apply.

  • keep_value Java value to produce when the filter would keep the given input.

  • remove_value Java value to produce when the filter would remove the given input.

The named options are:

  • :into an output field name, defaulting to ‘filter_value’.

  • :output an array of field names that specifies the fields to retain in the output tuple. Defaults to all_fields.



731
732
733
734
735
# File 'lib/cascading/assembly.rb', line 731

def set_value(input, filter, keep_value, remove_value, params = {})
  into = fields(params[:into] || 'filter_value')
  output = params[:output] || all_fields
  each input, :function => Java::CascadingOperationFunction::SetValue.new(into, filter, keep_value, remove_value), :output => output
end

#split(*args) ⇒ Object

Builds a pipe that splits a field into other fields, using a specified regular expression.

The first unnamed argument is the field to be split. The second unnamed argument is an array of strings indicating the fields receiving the result of the split.

The named options are:

  • :pattern a string or regex. Specifies the regular expression used for splitting the argument fields.

  • :output a string or array of strings. Specifies the outgoing fields (all fields will be output by default)



430
431
432
433
434
435
436
# File 'lib/cascading/assembly.rb', line 430

def split(*args)
  options = args.extract_options!
  fields = options[:into] || args[1]
  pattern = options[:pattern] || /[.,]*\s+/
  output = options[:output] || all_fields
  each(args[0], :function => regex_splitter(fields, :pattern => pattern), :output=>output)
end

#split_rows(*args) ⇒ Object

Builds a pipe that splits a field into new rows, using a specified regular expression.

The first unnamed argument is the field to be split. The second unnamed argument is the field receiving the result of the split.

The named options are:

  • :pattern a string or regex. Specifies the regular expression used for splitting the argument fields.

  • :output a string or array of strings. Specifies the outgoing fields (all fields will be output by default)



446
447
448
449
450
451
452
# File 'lib/cascading/assembly.rb', line 446

def split_rows(*args)
  options = args.extract_options!
  fields = options[:into] || args[1]
  pattern = options[:pattern] || /[.,]*\s+/
  output = options[:output] || all_fields
  each(args[0], :function => regex_split_generator(fields, :pattern => pattern), :output=>output)
end

#sub_assembly(sub_assembly, pipes = [tail_pipe], incoming_scopes = [scope]) ⇒ Object

Allows you to plugin c.p.SubAssemblies to a cascading.jruby Assembly under certain assumptions. Note the default is to extend the tail pipe of this Assembly using a linear SubAssembly. See SubAssembly class for details.



280
281
282
283
284
285
286
287
288
# File 'lib/cascading/assembly.rb', line 280

def sub_assembly(sub_assembly, pipes = [tail_pipe], incoming_scopes = [scope])
  sub_assembly = SubAssembly.new(self, sub_assembly)
  sub_assembly.finalize(pipes, incoming_scopes)

  @tail_pipe = sub_assembly.tail_pipe
  @outgoing_scopes[name] = sub_assembly.scope

  sub_assembly
end

#to_sObject



104
105
106
# File 'lib/cascading/assembly.rb', line 104

def to_s
  "#{name} : head pipe : #{head_pipe} - tail pipe: #{tail_pipe}"
end

#ungroup(*args) ⇒ Object

Ungroups, or unpivots, a tuple (see Cascading’s UnGroup at docs.cascading.org/cascading/2.0/javadoc/cascading/operation/function/UnGroup.html).

You must provide :key and you must provide only one of :value_selectors and :num_values.

The named options are:

  • :key required array of field names to replicate on every output row in an ungrouped group.

  • :value_selectors an array of field names to ungroup. Each field will be ungrouped into an output tuple along with the key fields in the order provided.

  • :num_values an integer specifying the number of fields to ungroup into each output tuple (excluding the key fields). All input fields will be ungrouped.

  • :input an array of field names that specifies the fields to input to UnGroup. Defaults to all_fields.

  • :into an array of field names. Default set by UnGroup.

  • :output an array of field names that specifies the fields to produce as output of UnGroup. Defaults to all_fields.



700
701
702
703
704
705
706
707
708
709
710
711
712
713
# File 'lib/cascading/assembly.rb', line 700

def ungroup(*args)
  options = args.extract_options!
  input = options[:input] || all_fields
  into = fields(options[:into])
  output = options[:output] || all_fields
  key = fields(options[:key])

  raise 'You must provide exactly one of :value_selectors or :num_values to ungroup' unless options.has_key?(:value_selectors) ^ options.has_key?(:num_values)
  value_selectors = options[:value_selectors].map{ |vs| fields(vs) }.to_java(Java::CascadingTuple::Fields) if options.has_key?(:value_selectors)
  num_values = options[:num_values] if options.has_key?(:num_values)

  parameters = [into, key, value_selectors, num_values].compact
  each input, :function => Java::CascadingOperationFunction::UnGroup.new(*parameters), :output => output
end

#union(*args, &block) ⇒ Object Also known as: union_pipes

Unifies multiple incoming pipes sharing the same field structure using a GroupBy. Accepts :on like join and :sort_by and :reverse like group_by, as well as a block which may be used for a sequence of Every aggregations.

By default, groups only on the first field (see line 189 of GroupBy.java)



257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
# File 'lib/cascading/assembly.rb', line 257

def union(*args, &block)
  options = args.extract_options!
  group_fields = fields(options[:on])
  sort_fields = fields(options[:sort_by])
  reverse = options[:reverse]

  pipes, _ = populate_incoming_scopes(args)

  # Must provide group_fields to ensure field name propagation
  group_fields = fields(@incoming_scopes.first.values_fields.get(0)) unless group_fields

  # FIXME: GroupBy is missing a constructor for union in wip-255
  sort_fields = group_fields if !sort_fields && !reverse.nil?

  parameters = [pipes.to_java(Java::CascadingPipe::Pipe), group_fields, sort_fields, reverse].compact
  apply_aggregations(Java::CascadingPipe::GroupBy.new(*parameters), @incoming_scopes, &block)
end

#where(*args) ⇒ Object

Builds a pipe that includes just the tuples matching an expression.

The first unamed argument, if provided, is a filtering expression (using the Janino syntax).

The named options are:

  • :expression a string. Specifies a Janino expression used to select the tuples. This option has the

same effect than providing it as first unamed argument. If this option is provided, then the filter is Janino expression-based.

  • :validate a boolean. Passed into Cascading#expr to enable or disable

expression validation. Defaults to true.

  • :validate_with a hash. Actual arguments used by Cascading#expr for

expression validation. Defaults to {}.



629
630
631
632
633
634
635
636
637
638
639
640
641
642
# File 'lib/cascading/assembly.rb', line 629

def where(*args)
  options = args.extract_options
  raise "Regex not allowed" if options && options[:pattern]

  if options[:expression]
    _, imports, expr = options[:expression].match(/^((?:\s*import.*;\s*)*)(.*)$/).to_a
    options[:expression] = "#{imports}!(#{expr})"
  elsif args[0]
    _, imports, expr = args[0].match(/^((?:\s*import.*;\s*)*)(.*)$/).to_a
    args[0] = "#{imports}!(#{expr})"
  end

  filter(*args)
end