Class: Theman::Agency
- Inherits:
-
Object
- Object
- Theman::Agency
- Defined in:
- lib/theman/agency.rb,
lib/theman/agency/table.rb,
lib/theman/agency/columns.rb
Defined Under Namespace
Classes: Columns, Error, Table
Instance Attribute Summary collapse
-
#columns ⇒ Object
readonly
Returns the value of attribute columns.
-
#connection ⇒ Object
readonly
Returns the value of attribute connection.
-
#table_name ⇒ Object
readonly
Returns the value of attribute table_name.
Instance Method Summary collapse
-
#add_primary_key! ⇒ Object
adds a serial column called id and sets as primary key if your data allready has a column called id the column will be called agents_pkey.
-
#analyze! ⇒ Object
analyzes the table for efficent query contstruction on tables larger than ~1000 tuples.
-
#chop(line = 1) ⇒ Object
line to finish copy at.
-
#chop_to_sed ⇒ Object
:nodoc:.
-
#create! ⇒ Object
Postgress COPY command using STDIN - reads chunks of 8192 bytes to save memory System command for IO subprocesses are piped to take advantage of multi cores.
-
#create_stream_columns ⇒ Object
:nodoc:.
-
#datestyle(arg) ⇒ Object
datestyle of date columns.
-
#delimiter(arg) ⇒ Object
delimter used in stream - comma is the default.
-
#delimiter_regexp ⇒ Object
:nodoc:.
-
#drop! ⇒ Object
explicitly drop table.
-
#headers ⇒ Object
:nodoc:.
-
#initialize(conn, stream, options = {}, &block) ⇒ Agency
constructor
create a new agent object - if a block is passed create! is called.
-
#nulls(*args) ⇒ Object
values in stream to replace with NULL.
-
#nulls_to_sed ⇒ Object
:nodoc:.
-
#pipe_it(l = "") ⇒ Object
:nodoc:.
-
#psql_command(psql = []) ⇒ Object
:nodoc:.
-
#psql_copy(psql = []) ⇒ Object
:nodoc:.
-
#sed_command(sed = []) ⇒ Object
:nodoc:.
-
#seds(*args) ⇒ Object
custom seds to parse stream with.
-
#stream(arg) ⇒ Object
the location of the data to be sent to Postgres via STDIN (requires a header row).
-
#system_command ⇒ Object
:nodoc:.
-
#table {|@columns| ... } ⇒ Object
create default columns from stream and replace selected columns with custom data types from block.
-
#transaction(&block) ⇒ Object
create a transaction block for use with :on_commit => :drop.
Constructor Details
#initialize(conn, stream, options = {}, &block) ⇒ Agency
create a new agent object - if a block is passed create! is called
Parameters
-
conn
- A database connection from thePGconn
class orActiveRecord::Base.connection.raw_connection
which is the same class. -
stream
- path to the data file. -
options
- Additional options are:temporary
,:on_commit
and:headers
Example
# Update all customers with the given attributes
conn = PGconn.open(:dbname => 'test')
agent = Theman::Agency.new(conn, 'sample.csv')
agent.create!
res = conn.exec("SELECT count(*) FROM #{agent.table_name}")
res.getvalue(0,0)
21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
# File 'lib/theman/agency.rb', line 21 def initialize(conn, stream, = {}, &block) @stream = stream @connection = conn @options = @table_name = sprintf "agent%010d", rand(100000000) @columns = Columns.new(conn) @stream_columns_set = false if block_given? yield self create! end end |
Instance Attribute Details
#columns ⇒ Object (readonly)
Returns the value of attribute columns.
3 4 5 |
# File 'lib/theman/agency.rb', line 3 def columns @columns end |
#connection ⇒ Object (readonly)
Returns the value of attribute connection.
3 4 5 |
# File 'lib/theman/agency.rb', line 3 def connection @connection end |
#table_name ⇒ Object (readonly)
Returns the value of attribute table_name.
3 4 5 |
# File 'lib/theman/agency.rb', line 3 def table_name @table_name end |
Instance Method Details
#add_primary_key! ⇒ Object
adds a serial column called id and sets as primary key if your data allready has a column called id the column will be called agents_pkey
140 141 142 143 |
# File 'lib/theman/agency.rb', line 140 def add_primary_key! name = @columns.include?(:id) ? "agents_pkey" : "id" connection.exec "ALTER TABLE #{table_name} ADD COLUMN #{name} serial PRIMARY KEY;" end |
#analyze! ⇒ Object
analyzes the table for efficent query contstruction on tables larger than ~1000 tuples
146 147 148 |
# File 'lib/theman/agency.rb', line 146 def analyze! connection.exec "ANALYZE #{table_name};" end |
#chop(line = 1) ⇒ Object
line to finish copy at
82 83 84 |
# File 'lib/theman/agency.rb', line 82 def chop(line = 1) @chop = line end |
#chop_to_sed ⇒ Object
:nodoc:
112 113 114 |
# File 'lib/theman/agency.rb', line 112 def chop_to_sed #:nodoc: "-n -e :a -e '1,#{@chop}!{P;N;D;};N;ba'" end |
#create! ⇒ Object
Postgress COPY command using STDIN
-
reads chunks of 8192 bytes to save memory
System command for IO subprocesses are piped to take advantage of multi cores
130 131 132 133 134 135 136 |
# File 'lib/theman/agency.rb', line 130 def create! unless @stream_columns_set || @options[:headers] == false create_stream_columns end connection.exec Table.new(table_name, @columns.to_sql, @options[:temporary], @options[:on_commit]).to_sql pipe_it end |
#create_stream_columns ⇒ Object
:nodoc:
43 44 45 46 47 48 |
# File 'lib/theman/agency.rb', line 43 def create_stream_columns #:nodoc: @stream_columns_set = true headers.split(delimiter_regexp).each do |column| @columns.string column end end |
#datestyle(arg) ⇒ Object
datestyle of date columns
67 68 69 |
# File 'lib/theman/agency.rb', line 67 def datestyle(arg) @datestyle = arg end |
#delimiter(arg) ⇒ Object
delimter used in stream - comma is the default
87 88 89 |
# File 'lib/theman/agency.rb', line 87 def delimiter(arg) @delimiter = arg end |
#delimiter_regexp ⇒ Object
:nodoc:
122 123 124 |
# File 'lib/theman/agency.rb', line 122 def delimiter_regexp #:nodoc: @delimiter_regexp ||= Regexp.new(@delimiter.nil? ? "," : "\\#{@delimiter}") end |
#drop! ⇒ Object
explicitly drop table
151 152 153 154 |
# File 'lib/theman/agency.rb', line 151 def drop! connection.exec "DROP TABLE #{table_name};" @table_name = nil end |
#headers ⇒ Object
:nodoc:
50 51 52 |
# File 'lib/theman/agency.rb', line 50 def headers #:nodoc: File.open(@stream, "r"){ |infile| infile.gets } end |
#nulls(*args) ⇒ Object
values in stream to replace with NULL
72 73 74 |
# File 'lib/theman/agency.rb', line 72 def nulls(*args) @nulls = args end |
#nulls_to_sed ⇒ Object
:nodoc:
116 117 118 119 120 |
# File 'lib/theman/agency.rb', line 116 def nulls_to_sed #:nodoc: @nulls.map do |regex| "-e 's/#{regex.source}//g'" end end |
#pipe_it(l = "") ⇒ Object
:nodoc:
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 |
# File 'lib/theman/agency.rb', line 164 def pipe_it(l = "") #:nodoc: connection.exec psql_command.join("; ") f = IO.popen(system_command) begin while f.read(8192, l) connection.put_copy_data l end rescue EOFError f.close end connection.put_copy_end res = connection.get_result status_code = res.result_status if status_code != 1 raise Error.new status_code, res.res_status(status_code), res. end end |
#psql_command(psql = []) ⇒ Object
:nodoc:
99 100 101 102 103 |
# File 'lib/theman/agency.rb', line 99 def psql_command(psql = []) #:nodoc: psql << "SET DATESTYLE TO #{@datestyle}" unless @datestyle.nil? psql << psql_copy.join(" ") psql end |
#psql_copy(psql = []) ⇒ Object
:nodoc:
91 92 93 94 95 96 97 |
# File 'lib/theman/agency.rb', line 91 def psql_copy(psql = []) #:nodoc: psql << "COPY #{table_name} FROM STDIN WITH" psql << "DELIMITER '#{@delimiter}'" unless @delimiter.nil? psql << "CSV" psql << "HEADER" unless @options[:headers] == false psql end |
#sed_command(sed = []) ⇒ Object
:nodoc:
105 106 107 108 109 110 |
# File 'lib/theman/agency.rb', line 105 def sed_command(sed = []) #:nodoc: sed << nulls_to_sed unless @nulls.nil? sed << @seds unless @seds.nil? sed << chop_to_sed unless @chop.nil? sed end |
#seds(*args) ⇒ Object
custom seds to parse stream with
77 78 79 |
# File 'lib/theman/agency.rb', line 77 def seds(*args) @seds = args end |
#stream(arg) ⇒ Object
the location of the data to be sent to Postgres via STDIN (requires a header row)
62 63 64 |
# File 'lib/theman/agency.rb', line 62 def stream(arg) @stream = arg end |
#system_command ⇒ Object
:nodoc:
156 157 158 159 160 161 162 |
# File 'lib/theman/agency.rb', line 156 def system_command #:nodoc: unless sed_command.empty? "cat #{@stream} | sed #{sed_command.join(" | sed ")}" else "cat #{@stream}" end end |
#table {|@columns| ... } ⇒ Object
create default columns from stream and replace selected columns with custom data types from block
56 57 58 59 |
# File 'lib/theman/agency.rb', line 56 def table(&block) create_stream_columns unless @options[:headers] == false yield @columns end |
#transaction(&block) ⇒ Object
create a transaction block for use with :on_commit => :drop
37 38 39 40 41 |
# File 'lib/theman/agency.rb', line 37 def transaction(&block) connection.exec "BEGIN;" yield connection.exec "COMMIT;" end |