Module: Sequel::Impala::DatasetMethods

Included in:
Dataset, JDBC::Hive2::Dataset, Rbhive::Dataset
Defined in:
lib/sequel/adapters/shared/impala.rb

Constant Summary collapse

BACKTICK =
'`'.freeze
APOS =
"'".freeze
STRING_ESCAPE_RE =
/([\\'])/
STRING_ESCAPE_REPLACE =
'\\\\\1'.freeze
BOOL_TRUE =
'true'.freeze
BOOL_FALSE =
'false'.freeze
CONSTANT_LITERAL_MAP =
{:CURRENT_TIMESTAMP=>'now()'.freeze}.freeze
PAREN_OPEN =
Dataset::PAREN_OPEN
PAREN_CLOSE =
Dataset::PAREN_CLOSE
SPACE =
Dataset::SPACE
NOT =
'NOT '.freeze
REGEXP =
' REGEXP '.freeze
EXCEPT_SOURCE_COLUMN =
:__source__
EXCEPT_STRATEGIES =
[:not_exists, :not_in, :left_join, :group_by].freeze
SELECT_VALUES =
'VALUES '.freeze

Instance Method Summary collapse

Instance Method Details

#complex_expression_sql_append(sql, op, args) ⇒ Object

Handle string concatenation using the concat string function. Don't use the ESCAPE syntax when using LIKE/NOT LIKE, as Impala doesn't support escaping LIKE metacharacters. Support regexps on Impala using the REGEXP operator. For cast insensitive regexps, cast both values to uppercase first.



432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
# File 'lib/sequel/adapters/shared/impala.rb', line 432

def complex_expression_sql_append(sql, op, args)
  case op
  when :'||'
    literal_append(sql, Sequel.function(:concat, *args))
  when :LIKE, :'NOT LIKE'
    sql << PAREN_OPEN
    literal_append(sql, args.at(0))
    sql << SPACE << op.to_s << SPACE
    literal_append(sql, args.at(1))
    sql << PAREN_CLOSE
  when :~, :'!~', :'~*', :'!~*'
    if op == :'~*'  || op == :'!~*'
      args = args.map{|a| Sequel.function(:upper, a)}
    end
    sql << NOT if op == :'!~'  || op == :'!~*'
    sql << PAREN_OPEN
    literal_append(sql, args.at(0))
    sql << REGEXP
    literal_append(sql, args.at(1))
    sql << PAREN_CLOSE
  else
    super
  end
end

#constant_sql_append(sql, constant) ⇒ Object

Use now() for current timestamp, as Impala doesn't support CURRENT_TIMESTAMP.



459
460
461
# File 'lib/sequel/adapters/shared/impala.rb', line 459

def constant_sql_append(sql, constant)
  sql << CONSTANT_LITERAL_MAP.fetch(constant, constant.to_s)
end

#date_add_sql_append(sql, da) ⇒ Object

Use the addition operator combined with interval types to handle date arithmetic when using the date_arithmetic extension.



466
467
468
469
470
471
472
473
474
475
476
477
478
479
# File 'lib/sequel/adapters/shared/impala.rb', line 466

def date_add_sql_append(sql, da)
  h = da.interval
  expr = da.expr
  intervals = []
  each_valid_interval_unit(h, Sequel::SQL::DateAdd::DatasetMethods::DEF_DURATION_UNITS) do |value, sql_unit|
    intervals << Sequel.lit("INTERVAL #{value} #{sql_unit}")
  end
  if intervals.empty?
    return literal_append(sql, Sequel.cast(expr, Time))
  else
    intervals.unshift(Sequel.cast(expr, Time))
    return complex_expression_sql_append(sql, :+, intervals)
  end
end

#deleteObject

DELETE is emulated on Impala and doesn't return the number of modified rows.



483
484
485
486
# File 'lib/sequel/adapters/shared/impala.rb', line 483

def delete
  super
  nil
end

#delete_sqlObject

Emulate DELETE using INSERT OVERWRITE selecting all columns from the table, with a reversed condition used for WHERE.



490
491
492
493
494
495
496
497
498
499
500
501
502
503
# File 'lib/sequel/adapters/shared/impala.rb', line 490

def delete_sql
  sql = "INSERT OVERWRITE "
  source_list_append(sql, opts[:from])
  sql << " SELECT * FROM "
  source_list_append(sql, opts[:from])
  if where = opts[:where]
    sql << " WHERE NOT ("
    literal_append(sql, where)
    sql << ")"
  else
    sql << " WHERE false"
  end
  sql
end

#empty?Boolean

Don't remove an order, because that breaks things when offsets are used, as Impala requires an order when using an offset.

Returns:

  • (Boolean)


527
528
529
# File 'lib/sequel/adapters/shared/impala.rb', line 527

def empty?
  get(Sequel::SQL::AliasedExpression.new(1, :one)).nil?
end

#except(other, opts = OPTS) ⇒ Object

Emulate EXCEPT using a chosen strategy and checking for values in only the first table.

Raises:

  • (InvalidOperation)


532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
# File 'lib/sequel/adapters/shared/impala.rb', line 532

def except(other, opts=OPTS)
  raise(InvalidOperation, "EXCEPT ALL not supported") if opts[:all]
  raise(InvalidOperation, "The :from_self=>false option to except is not supported") if opts[:from_self] == false

  strategy, *keys = @opts[:except_strategy]
  ds = from_self(:alias=>:t1)

  ds = case strategy
  when :not_exists
    ds.exclude(other.
        from_self(:alias=>:t2).
        where(keys.map{|key| [Sequel.qualify(:t1, key), Sequel.qualify(:t2, key)]}).
        select(nil).
        exists)
  when :not_in
    raise Sequel::Error, ":not_in EXCEPT strategy only supports a single key" unless keys.length == 1
    key = keys.first
    ds.exclude(Sequel.qualify(:t1, key)=>other.from_self(:alias=>:t2).select(key))
  when :left_join
    ds.left_join(other.from_self(:alias=>:t2).as(:t2), keys.map{|key| [key, key]}).
      where(Sequel.or(keys.map{|key| [Sequel.qualify(:t2, key), nil]})).
      select_all(:t1)
  else
    cols = columns
    rhs = other.from_self.select_group(*other.columns).select_append(Sequel.expr(2).as(EXCEPT_SOURCE_COLUMN))
    ds.select_group(*cols).
      select_append(Sequel.expr(1).as(EXCEPT_SOURCE_COLUMN)).
      union(rhs, all: true).
      select_group(*cols).
      having{{count{}.* => 1, min(EXCEPT_SOURCE_COLUMN) => 1}}
  end

  ds.from_self(opts)
end

#except_strategy(strategy, *keys) ⇒ Object

The strategy to use for EXCEPT emulation. By default, uses a GROUP BY emulation, as that doesn't require you provide a key column, but you can use this to choose a NOT EXISTS, NOT IN, or LEFT JOIN emulation, providing the unique key column.

Raises:

  • (Sequel::Error)


570
571
572
573
# File 'lib/sequel/adapters/shared/impala.rb', line 570

def except_strategy(strategy, *keys)
  raise Sequel::Error, "invalid EXCEPT strategy: #{strategy.inspect}" unless EXCEPT_STRATEGIES.include?(strategy)
  clone(:except_strategy=>[strategy, *keys])
end

#fromObject

Implicitly qualify tables if using the :search_path database option.



506
507
508
509
510
# File 'lib/sequel/adapters/shared/impala.rb', line 506

def from(*)
  ds = super
  ds.opts[:from].map!{|t| db.implicit_qualify(t)}
  ds
end

#insert_overwriteObject

Use INSERT OVERWRITE instead of INSERT INTO when inserting into this dataset:

DB[:table].insert_overwrite.insert(DB[:other])
# INSERT OVERWRITE table SELECT * FROM other


579
580
581
# File 'lib/sequel/adapters/shared/impala.rb', line 579

def insert_overwrite
  clone(:insert_overwrite=>true)
end

#insert_supports_empty_values?Boolean

Impala does not support INSERT DEFAULT VALUES.

Returns:

  • (Boolean)


584
585
586
# File 'lib/sequel/adapters/shared/impala.rb', line 584

def insert_supports_empty_values?
  false
end

#intersect(other, opts = OPTS) ⇒ Object

Emulate INTERSECT using a UNION ALL and checking for values in both tables.

Raises:

  • (InvalidOperation)


589
590
591
592
593
594
595
596
597
# File 'lib/sequel/adapters/shared/impala.rb', line 589

def intersect(other, opts=OPTS)
  raise(InvalidOperation, "INTERSECT ALL not supported") if opts[:all]
  raise(InvalidOperation, "The :from_self=>false option to intersect is not supported") if opts[:from_self] == false
  cols = columns
  (from_self(alias: :l)
    .join(other, cols)
    .select_all(:l))
    .from_self(opts)
end

#join_table(type, table, expr = nil, options = OPTS, &block) ⇒ Object

Implicitly qualify tables if using the :search_path database option.



513
514
515
# File 'lib/sequel/adapters/shared/impala.rb', line 513

def join_table(type, table, expr=nil, options=OPTS, &block)
  super(type, db.implicit_qualify(table), expr, options, &block)
end

#supports_cte?(type = :select) ⇒ Boolean

Impala supports non-recursive common table expressions.

Returns:

  • (Boolean)


600
601
602
# File 'lib/sequel/adapters/shared/impala.rb', line 600

def supports_cte?(type=:select)
  true
end

#supports_cte_in_subqueries?Boolean

Returns:

  • (Boolean)


604
605
606
# File 'lib/sequel/adapters/shared/impala.rb', line 604

def supports_cte_in_subqueries?
  true
end

#supports_derived_column_lists?Boolean

Impala doesn't support derived column lists when aliasing tables.

Returns:

  • (Boolean)


610
611
612
# File 'lib/sequel/adapters/shared/impala.rb', line 610

def supports_derived_column_lists?
  false
end

#supports_intersect_except_all?Boolean

Impala doesn't support EXCEPT or INTERSECT, but support is emulated for them. However, EXCEPT ALL and INTERSECT ALL are not emulated.

Returns:

  • (Boolean)


616
617
618
# File 'lib/sequel/adapters/shared/impala.rb', line 616

def supports_intersect_except_all?
  false
end

#supports_is_true?Boolean

Impala only support IS NULL, not IS TRUE or IS FALSE.

Returns:

  • (Boolean)


621
622
623
# File 'lib/sequel/adapters/shared/impala.rb', line 621

def supports_is_true?
  false
end

#supports_multiple_column_in?Boolean

Impala doesn't support IN when used with multiple columns.

Returns:

  • (Boolean)


626
627
628
# File 'lib/sequel/adapters/shared/impala.rb', line 626

def supports_multiple_column_in?
  false
end

#supports_regexp?Boolean

Impala supports regexps using the REGEXP operator.

Returns:

  • (Boolean)


631
632
633
# File 'lib/sequel/adapters/shared/impala.rb', line 631

def supports_regexp?
  true
end

#supports_window_functions?Boolean

Impala supports window functions.

Returns:

  • (Boolean)


636
637
638
# File 'lib/sequel/adapters/shared/impala.rb', line 636

def supports_window_functions?
  true
end

#to_parquet(table, options = OPTS) ⇒ Object

Create a parquet file from this dataset. table should be the table name to create. To specify a path for the parquet file, use the :location option.

Examples:

DB[:t].to_parquet(:p)
# CREATE TABLE `p` STORED AS parquet AS
# SELECT * FROM `t`

DB[:t].to_parquet(:p, :location=>'/a/b')
# CREATE TABLE `p` STORED AS parquet LOCATION '/a/b'
# SELECT * FROM `t`


653
654
655
# File 'lib/sequel/adapters/shared/impala.rb', line 653

def to_parquet(table, options=OPTS)
  db.create_table(table, options.merge(:as=>self, :stored_as=>:parquet))
end

#truncate_sqlObject

Emulate TRUNCATE by using INSERT OVERWRITE selecting all columns from the table, with WHERE false.



519
520
521
522
523
# File 'lib/sequel/adapters/shared/impala.rb', line 519

def truncate_sql
  ds = clone
  ds.opts.delete(:where)
  ds.delete_sql
end

#update(values = OPTS) ⇒ Object

UPDATE is emulated on Impala, and returns nil instead of the number of modified rows



659
660
661
662
# File 'lib/sequel/adapters/shared/impala.rb', line 659

def update(values=OPTS)
  super
  nil
end

#update_sql(values) ⇒ Object

Emulate UPDATE using INSERT OVERWRITE AS SELECT. For all columns used in the given values, use a CASE statement. In the CASE statement, set the value to the new value if the row matches WHERE conditions of the current dataset, otherwise use the existing value.



668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
# File 'lib/sequel/adapters/shared/impala.rb', line 668

def update_sql(values)
  sql = "INSERT OVERWRITE "
  source_list_append(sql, opts[:from])
  sql << " SELECT "
  comma = false

  if where = opts[:where]
    where = Sequel.lit(literal(where))
  else
    where = true
  end

  select_all.columns.each do |c|
    if comma
      sql <<  comma
    else
      comma = ', '
    end

    if values.has_key?(c)
      new_value = values[c]
      literal_append(sql, Sequel.case({where=>new_value}, c).as(c))
    else
      quote_identifier_append(sql, c)
    end
  end
  sql << " FROM "
  source_list_append(sql, opts[:from])
  sql
end