Class: GoodData::Datawarehouse

Inherits:
Object
  • Object
show all
Defined in:
lib/gooddata_datawarehouse/version.rb,
lib/gooddata_datawarehouse/datawarehouse.rb

Constant Summary collapse

VERSION =
"0.0.12"
PARALEL_COPY_THREAD_COUNT =
10

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(username, password, instance_id, opts = {}) ⇒ Datawarehouse

Returns a new instance of Datawarehouse.



18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 18

def initialize(username, password, instance_id, opts={})
  @logger = Logger.new(STDOUT)
  @username = username
  @password = password
  @sst_token = opts[:sst]
  @jdbc_url = opts[:jdbc_url] || "jdbc:gdc:datawarehouse://secure.gooddata.com/gdc/datawarehouse/instances/#{instance_id}"

  if instance_id.nil? && opts[:jdbc_url].nil?
    fail ArgumentError, "you must either provide instance_id or jdbc_url option."
  end

  Jdbc::DSS.load_driver
  Java.com.gooddata.dss.jdbc.driver.DssDriver
end

Class Method Details

.new_instance(opts = {}) ⇒ Object



14
15
16
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 14

def self.new_instance(opts={})
  self.new(opts[:username], opts[:password], opts[:instance_id], opts)
end

Instance Method Details

#connectObject



197
198
199
200
201
202
203
204
205
206
207
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 197

def connect
  if @username.to_s.empty? || @password.to_s.empty?
    @connection = Sequel.connect(@jdbc_url, :driver => Java.com.gooddata.dss.jdbc.driver.DssDriver, :jdbc_properties => {'sst' => @sst_token})
  else
    @connection = Sequel.connect(@jdbc_url, :driver => Java.com.gooddata.dss.jdbc.driver.DssDriver, :username => @username, :password => @password)
  end
  yield(@connection)
ensure
  @connection.disconnect unless @connection.nil?
  Sequel.synchronize{::Sequel::DATABASES.delete(@connection)}
end

#create_table(name, columns, opts = {}) ⇒ Object



135
136
137
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 135

def create_table(name, columns, opts={})
  execute(GoodData::SQLGenerator.create_table(name, columns, opts))
end

#create_table_from_csv_header(table_name, csv_path, opts = {}) ⇒ Object

returns a list of columns created does nothing if file empty, returns []



128
129
130
131
132
133
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 128

def create_table_from_csv_header(table_name, csv_path, opts={})
  # take the header as a list of columns
  columns = get_csv_headers(csv_path)
  create_table(table_name, columns, opts) unless columns.empty?
  columns
end

#csv_to_new_table(table_name, csvs, opts = {}) ⇒ Object



64
65
66
67
68
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 64

def csv_to_new_table(table_name, csvs, opts={})
  csv_list = list_files(csvs)
  cols = create_table_from_csv_header(table_name, csv_list[0], opts)
  load_data_from_csv(table_name, csv_list, opts.merge(columns: cols, append: true))
end

#drop_table(table_name, opts = {}) ⇒ Object



60
61
62
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 60

def drop_table(table_name, opts={})
  execute(GoodData::SQLGenerator.drop_table(table_name,opts))
end

#execute(sql_strings) ⇒ Object

execute sql, return nothing



153
154
155
156
157
158
159
160
161
162
163
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 153

def execute(sql_strings)
  if ! sql_strings.kind_of?(Array)
    sql_strings = [sql_strings]
  end
  connect do |connection|
    sql_strings.each do |sql|
      @logger.info("Executing sql: #{sql}") if @logger
      connection.run(sql)
    end
  end
end

#execute_select(sql, opts = {}) ⇒ Object

executes sql (select), for each row, passes execution to block



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 166

def execute_select(sql, opts={})
  fetch_handler = opts[:fetch_handler]
  count = opts[:count]

  connect do |connection|
    # do the query
    f = connection.fetch(sql)

    @logger.info("Executing sql: #{sql}") if @logger
    # if handler was passed call it
    if fetch_handler
      fetch_handler.call(f)
    end

    if count
      return f.first[:count]
    end

    # if block given yield to process line by line
    if block_given?
      # go through the rows returned and call the block
      return f.each do |row|
        yield(row)
      end
    end

    # return it all at once
    f.map{|h| h}
  end
end

#export_table(table_name, csv_path) ⇒ Object



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 33

def export_table(table_name, csv_path)
  CSV.open(csv_path, 'wb', :force_quotes => true) do |csv|
    # get the names of cols
    cols = get_columns(table_name).map {|c| c[:column_name]}

    # write header
    csv << cols

    # get the keys for columns, stupid sequel
    col_keys = nil
    rows = execute_select(GoodData::SQLGenerator.select_all(table_name, limit: 1))

    col_keys = rows[0].keys

    execute_select(GoodData::SQLGenerator.select_all(table_name)) do |row|
      # go through the table write to csv
      csv << row.values_at(*col_keys)
    end
  end
  @logger.info "Table #{table_name} exported to #{csv_path.respond_to?(:path)? csv_path.path : csv_path}"
  csv_path
end

#get_columns(table_name) ⇒ Object



148
149
150
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 148

def get_columns(table_name)
  res = execute_select(GoodData::SQLGenerator.get_columns(table_name))
end

#init_file(given_filename, key, csv_path, single_file) ⇒ Object



115
116
117
118
119
120
121
122
123
124
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 115

def init_file(given_filename, key, csv_path, single_file)
  # only use file postfix if there are multiple files
  postfix = single_file ? '' : "-#{File.basename(csv_path)}"

  # take what we have and put the source csv name at the end
  given_filename = given_filename.path if given_filename.is_a?(File)
  f = "#{given_filename || Tempfile.new(key).path}#{postfix}"
  f = File.new(f, 'w') unless f.is_a?(File)
  f
end

#load_data_from_csv(table_name, csvs, opts = {}) ⇒ Object



74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 74

def load_data_from_csv(table_name, csvs, opts={})
  thread_count = opts[:paralel_copy_thread_count] || PARALEL_COPY_THREAD_COUNT
  # get the list of files to load and columns in the csv
  csv_list = list_files(csvs)
  columns = opts[:columns] || get_csv_headers(csv_list[0])

  # truncate_table unless data should be appended
  unless opts[:append]
    truncate_table(table_name)
  end

  # load each csv from the list
  single_file = (csv_list.size == 1)
  csv_list.each do |csv_path|
    begin
      if opts[:ignore_parse_errors] && opts[:exceptions_file].nil? && opts[:rejections_file].nil?
        exc = nil
        rej = nil
        opts_file = opts
      else
        opts_file = opts.clone
        # priradit do opts i do exc -
        # temporary files to get the excepted records (if not given)
        exc = opts_file[:exceptions_file] = init_file(opts_file[:exceptions_file], 'exceptions', csv_path, single_file)
        rej = opts_file[:rejections_file] = init_file(opts_file[:rejections_file], 'rejections', csv_path, single_file)
      end

      # execute the load
      execute(GoodData::SQLGenerator.load_data(table_name, csv_path, columns, opts_file))

      # if there was something rejected and it shouldn't be ignored, raise an error
      if ((exc && File.size?(exc)) || (rej && File.size?(rej))) && (! opts[:ignore_parse_errors])
        fail ArgumentError, "Some lines in the CSV didn't go through. Exceptions: #{IO.read(exc)}\nRejected records: #{IO.read(rej)}"
      end
    ensure
      exc.close if exc
      rej.close if rej
    end
  end
end

#rename_table(old_name, new_name) ⇒ Object



56
57
58
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 56

def rename_table(old_name, new_name)
  execute(GoodData::SQLGenerator.rename_table(old_name, new_name))
end

#table_exists?(name) ⇒ Boolean

Returns:

  • (Boolean)


139
140
141
142
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 139

def table_exists?(name)
  count = execute_select(GoodData::SQLGenerator.get_table_count(name), :count => true)
  count > 0
end

#table_row_count(table_name) ⇒ Object



144
145
146
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 144

def table_row_count(table_name)
  execute_select(GoodData::SQLGenerator.get_row_count(table_name), :count => true)
end

#truncate_table(table_name) ⇒ Object



70
71
72
# File 'lib/gooddata_datawarehouse/datawarehouse.rb', line 70

def truncate_table(table_name)
  execute(GoodData::SQLGenerator.truncate_table(table_name))
end