Module: Sq::Dbsync::Database::Common

Included in:
Mysql, Postgres
Defined in:
lib/sq/dbsync/database/common.rb

Constant Summary collapse

SQD =
::Sq::Dbsync

Instance Method Summary collapse

Instance Method Details

#__getobj__Object



67
68
69
# File 'lib/sq/dbsync/database/common.rb', line 67

def __getobj__
  db
end

#__setobj__(db) ⇒ Object



71
72
73
# File 'lib/sq/dbsync/database/common.rb', line 71

def __setobj__(db)
  @db = db
end

#customize_sql(sql, schema) ⇒ Object



42
43
44
# File 'lib/sq/dbsync/database/common.rb', line 42

def customize_sql(sql, schema)
  sql
end

#ensure_connectionObject

Since we go so long without using connections (during a batch load), they go stale and raise DatabaseDisconnectError when we try to use them. This method ensures that the connection is fresh even after a long time between drinks.



63
64
65
# File 'lib/sq/dbsync/database/common.rb', line 63

def ensure_connection
  db.disconnect
end

#extract_incrementally_to_file(plan, file_name, last_row_at, overlap) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/sq/dbsync/database/common.rb', line 26

def extract_incrementally_to_file(plan, file_name, last_row_at, overlap)
  table_name = plan.source_table_name.to_sym
  db_columns = db.schema(table_name).map(&:first)

  query = self[table_name].select(*plan.columns)
  if last_row_at
    query = query.filter("#{plan.timestamp} > ?", last_row_at - overlap)
  end

  sql = query.sql

  sql = customize_sql(sql, db.schema(table_name))

  extract_sql_to_file(sql, file_name)
end

#extract_to_file(table_name, columns, file_name) ⇒ Object



19
20
21
22
23
24
# File 'lib/sq/dbsync/database/common.rb', line 19

def extract_to_file(table_name, columns, file_name)
  extract_sql_to_file("SELECT %s FROM %s" % [
    columns.join(', '),
    table_name
  ], file_name)
end

#hash_schema(plan) ⇒ Object



46
47
48
49
# File 'lib/sq/dbsync/database/common.rb', line 46

def hash_schema(plan)
  ensure_connection
  Hash[schema(source? ? plan.source_table_name : plan.table_name)]
end

#initialize(opts, source_or_target) ⇒ Object



8
9
10
11
12
13
# File 'lib/sq/dbsync/database/common.rb', line 8

def initialize(opts, source_or_target)
  db = Sequel.connect(opts)
  super(db)
  @db, @source_or_target = db, source_or_target
  @charset = opts[:charset] if opts[:charset]
end

#inspectObject



15
16
17
# File 'lib/sq/dbsync/database/common.rb', line 15

def inspect
  "#<Database::#{self.class.name} #{source_or_target} #{opts[:database]}>"
end

#nameObject



55
56
57
# File 'lib/sq/dbsync/database/common.rb', line 55

def name
  self['SELECT database()'].first.fetch(:'database()')
end

#source?Boolean

Returns:

  • (Boolean)


51
52
53
# File 'lib/sq/dbsync/database/common.rb', line 51

def source?
  source_or_target == :source
end