Module: Sql2Avro
- Defined in:
- lib/sql2avro.rb
Constant Summary collapse
- AVRO_TOOLS_PATH =
File.('../../vendor/avro-tools-1.7.4.jar', __FILE__)
Class Method Summary collapse
-
.avroize(database_config, table, min_id, max_rows_per_batch = nil, directory = '/tmp') ⇒ Object
Pulls data from the given database table starting from the given id.
Class Method Details
.avroize(database_config, table, min_id, max_rows_per_batch = nil, directory = '/tmp') ⇒ Object
Pulls data from the given database table starting from the given id.
This function creates an Avro file as a side effect, and returns
max_id: greatest ID that was pulled in,
path: filepath of the resulting avroized file
error: error message, if any; otherwise omitted
database_config is a hash with this form (like ActiveRecord’s):
adapter: "mysql",
host: "localhost",
username: "myuser",
password: "mypass",
database: "somedatabase"
table is the table to pull from.
min_id specifies the value of the id column from which to start.
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/sql2avro.rb', line 30 def Sql2Avro.avroize(database_config, table, min_id, max_rows_per_batch=nil, directory='/tmp') raise "Database interface not specified." if !database_config.has_key? 'adapter' raise "Database interface not supported: #{database_config['adapter']}" unless ['mysql', 'mysql2'].include? database_config['adapter'] interface = MySql.new(database_config) schema = Yajl::Encoder.encode(interface.schema(table)) max_id = interface.max_id(table) max_id_this_batch = if max_rows_per_batch.nil? max_id else [max_id, min_id + max_rows_per_batch].min end date, time, zone = Time.now.utc.to_s.split filename = "#{table}.#{date}T#{time}Z.#{min_id}.#{max_id_this_batch}.avro" retval = { max_id: max_id_this_batch, path: File.join(directory, filename) } prev_default_internal = Encoding.default_internal Encoding.default_internal = nil json_file = File.join(directory, "#{filename}.json") File.open(json_file, 'w') do |f| interface.data(table, min_id, max_id_this_batch) do |datum| Yajl::Encoder.encode(datum, f) f.write "\n" end end Encoding.default_internal = prev_default_internal cmd = "java -jar #{AVRO_TOOLS_PATH} fromjson --codec snappy --schema '#{schema}' #{json_file} > #{File.join(directory, filename)}" `#{cmd}` if !$?.success? raise "Error converting JSON to Avro.\n\nCommand: #{cmd}\nStatus: #{$?}" end `rm #{json_file}` if !$?.success? raise "Error deleting temporary JSON file #{json_file}" end retval end |