Class: MySqlStreamer

Inherits:
Object show all
Defined in:
lib/etl/control/source/mysql_streamer.rb

Overview

Internal: The MySQL streamer is a helper with works with the database_source

in order to allow you to use the --quick option (which stops MySQL)
from building a full result set,  also we don't build a full resultset
in Ruby - instead we yield a row at a time

Instance Method Summary collapse

Constructor Details

#initialize(query, target, connection) ⇒ MySqlStreamer

Internal: Creates a MySQL Streamer

query - the SQL query target - the name of the ETL configuration (ie. development/production) connection - the ActiveRecord connection

Examples

MySqlStreamer.new("select * from bob", "development", my_connection)


19
20
21
22
23
24
25
# File 'lib/etl/control/source/mysql_streamer.rb', line 19

def initialize(query, target, connection)
  # Lets just be safe and also make sure there aren't new lines
  # in the SQL - its bound to cause trouble
  @query = query.split.join(' ')
  @name = target
  @first_row = connection.select_all("#{query} LIMIT 1")
end

Instance Method Details

#any?Boolean

We implement some bits of a hash so that database_source can use them

Returns:

  • (Boolean)


29
30
31
# File 'lib/etl/control/source/mysql_streamer.rb', line 29

def any?
  @first_row.any?
end

#eachObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/etl/control/source/mysql_streamer.rb', line 43

def each
  keys = nil

  config = ETL::Base.configurations[@name.to_s]
  host = mandatory_option!(config, 'host')
  username = mandatory_option!(config, 'username')
  database = mandatory_option!(config, 'database')
  password = config['password'] # this one can omitted in some cases

  mysql_command = """mysql --quick -h #{host} -u #{username} -e \"#{@query.gsub("\n","")}\" -D #{database} --password=#{password} -B"""
  Open3.popen3(mysql_command) do |stdin, out, err, external|
    until (line = out.gets).nil? do
      line = line.gsub("\n","")
      if keys.nil?
        keys = line.split("\t")
      else
        hash = Hash[keys.zip(line.split("\t"))]
        # map out NULL to nil
        hash.each do |k, v|
          hash[k] = nil if v == 'NULL'
        end
        yield hash
      end
    end
    error = err.gets
    if (!error.nil? && error.strip.length > 0)
      throw error
    end
  end
end

#firstObject



33
34
35
# File 'lib/etl/control/source/mysql_streamer.rb', line 33

def first
  @first_row.first
end

#mandatory_option!(hash, key) ⇒ Object



37
38
39
40
41
# File 'lib/etl/control/source/mysql_streamer.rb', line 37

def mandatory_option!(hash, key)
  value = hash[key]
  raise "Missing key #{key} in connection configuration #{@name}" if value.blank?
  value
end