Class: MySqlStreamer
Overview
Internal: The MySQL streamer is a helper with works with the database_source
in order to allow you to use the --quick option (which stops MySQL)
from building a full result set, also we don't build a full resultset
in Ruby - instead we yield a row at a time
Instance Method Summary collapse
-
#any? ⇒ Boolean
We implement some bits of a hash so that database_source can use them.
- #each ⇒ Object
- #first ⇒ Object
-
#initialize(query, target, connection) ⇒ MySqlStreamer
constructor
Internal: Creates a MySQL Streamer.
- #mandatory_option!(hash, key) ⇒ Object
Constructor Details
#initialize(query, target, connection) ⇒ MySqlStreamer
Internal: Creates a MySQL Streamer
query - the SQL query target - the name of the ETL configuration (ie. development/production) connection - the ActiveRecord connection
Examples
MySqlStreamer.new("select * from bob", "development", my_connection)
19 20 21 22 23 24 25 |
# File 'lib/etl/control/source/mysql_streamer.rb', line 19 def initialize(query, target, connection) # Lets just be safe and also make sure there aren't new lines # in the SQL - its bound to cause trouble @query = query.split.join(' ') @name = target @first_row = connection.select_all("#{query} LIMIT 1") end |
Instance Method Details
#any? ⇒ Boolean
We implement some bits of a hash so that database_source can use them
29 30 31 |
# File 'lib/etl/control/source/mysql_streamer.rb', line 29 def any? @first_row.any? end |
#each ⇒ Object
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/etl/control/source/mysql_streamer.rb', line 43 def each keys = nil config = ETL::Base.configurations[@name.to_s] host = mandatory_option!(config, 'host') username = mandatory_option!(config, 'username') database = mandatory_option!(config, 'database') password = config['password'] # this one can omitted in some cases mysql_command = """mysql --quick -h #{host} -u #{username} -e \"#{@query.gsub("\n","")}\" -D #{database} --password=#{password} -B""" Open3.popen3(mysql_command) do |stdin, out, err, external| until (line = out.gets).nil? do line = line.gsub("\n","") if keys.nil? keys = line.split("\t") else hash = Hash[keys.zip(line.split("\t"))] # map out NULL to nil hash.each do |k, v| hash[k] = nil if v == 'NULL' end yield hash end end error = err.gets if (!error.nil? && error.strip.length > 0) throw error end end end |
#first ⇒ Object
33 34 35 |
# File 'lib/etl/control/source/mysql_streamer.rb', line 33 def first @first_row.first end |
#mandatory_option!(hash, key) ⇒ Object
37 38 39 40 41 |
# File 'lib/etl/control/source/mysql_streamer.rb', line 37 def mandatory_option!(hash, key) value = hash[key] raise "Missing key #{key} in connection configuration #{@name}" if value.blank? value end |