Class: Theman::Agency

Inherits:
Object
  • Object
show all
Defined in:
lib/theman/agency.rb,
lib/theman/agency/table.rb,
lib/theman/agency/columns.rb

Defined Under Namespace

Classes: Columns, Error, Table

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(conn, stream, options = {}, &block) ⇒ Agency

create a new agent object - if a block is passed create! is called

Parameters

  • conn - A database connection from the PGconn class or ActiveRecord::Base.connection.raw_connection which is the same class.

  • stream - path to the data file.

  • options - Additional options are :temporary, :on_commit and :headers

Example

# Update all customers with the given attributes
conn  = PGconn.open(:dbname => 'test')
agent = Theman::Agency.new(conn, 'sample.csv')
agent.create!
res = conn.exec("SELECT count(*) FROM #{agent.table_name}")
res.getvalue(0,0)


21
22
23
24
25
26
27
28
29
30
31
32
33
34
# File 'lib/theman/agency.rb', line 21

def initialize(conn, stream, options = {}, &block)
  @stream       = stream
  @connection   = conn
  @options      = options

  @table_name         = sprintf "agent%010d", rand(100000000)
  @columns            = Columns.new(conn)
  @stream_columns_set = false

  if block_given?
    yield self
    create!
  end
end

Instance Attribute Details

#columnsObject (readonly)

Returns the value of attribute columns.



3
4
5
# File 'lib/theman/agency.rb', line 3

def columns
  @columns
end

#connectionObject (readonly)

Returns the value of attribute connection.



3
4
5
# File 'lib/theman/agency.rb', line 3

def connection
  @connection
end

#table_nameObject (readonly)

Returns the value of attribute table_name.



3
4
5
# File 'lib/theman/agency.rb', line 3

def table_name
  @table_name
end

Instance Method Details

#add_primary_key!Object

adds a serial column called id and sets as primary key if your data allready has a column called id the column will be called agents_pkey



140
141
142
143
# File 'lib/theman/agency.rb', line 140

def add_primary_key!
  name = @columns.include?(:id) ? "agents_pkey" : "id"
  connection.exec "ALTER TABLE #{table_name} ADD COLUMN #{name} serial PRIMARY KEY;"
end

#analyze!Object

analyzes the table for efficent query contstruction on tables larger than ~1000 tuples



146
147
148
# File 'lib/theman/agency.rb', line 146

def analyze!
  connection.exec "ANALYZE #{table_name};"
end

#chop(line = 1) ⇒ Object

line to finish copy at



82
83
84
# File 'lib/theman/agency.rb', line 82

def chop(line = 1)
  @chop = line
end

#chop_to_sedObject

:nodoc:



112
113
114
# File 'lib/theman/agency.rb', line 112

def chop_to_sed #:nodoc:
  "-n -e :a -e '1,#{@chop}!{P;N;D;};N;ba'"
end

#create!Object

Postgress COPY command using STDIN

  • reads chunks of 8192 bytes to save memory

System command for IO subprocesses are piped to take advantage of multi cores



130
131
132
133
134
135
136
# File 'lib/theman/agency.rb', line 130

def create!
  unless @stream_columns_set || @options[:headers] == false
    create_stream_columns
  end
  connection.exec Table.new(table_name, @columns.to_sql, @options[:temporary], @options[:on_commit]).to_sql
  pipe_it
end

#create_stream_columnsObject

:nodoc:



43
44
45
46
47
48
# File 'lib/theman/agency.rb', line 43

def create_stream_columns #:nodoc:
  @stream_columns_set = true
  headers.split(delimiter_regexp).each do |column|
    @columns.string column
  end
end

#datestyle(arg) ⇒ Object

datestyle of date columns



67
68
69
# File 'lib/theman/agency.rb', line 67

def datestyle(arg)
  @datestyle = arg
end

#delimiter(arg) ⇒ Object

delimter used in stream - comma is the default



87
88
89
# File 'lib/theman/agency.rb', line 87

def delimiter(arg)
  @delimiter = arg
end

#delimiter_regexpObject

:nodoc:



122
123
124
# File 'lib/theman/agency.rb', line 122

def delimiter_regexp #:nodoc:
  @delimiter_regexp ||= Regexp.new(@delimiter.nil? ? "," : "\\#{@delimiter}")
end

#drop!Object

explicitly drop table



151
152
153
154
# File 'lib/theman/agency.rb', line 151

def drop!
  connection.exec "DROP TABLE #{table_name};"
  @table_name = nil
end

#headersObject

:nodoc:



50
51
52
# File 'lib/theman/agency.rb', line 50

def headers #:nodoc:
  File.open(@stream, "r"){ |infile| infile.gets }
end

#nulls(*args) ⇒ Object

values in stream to replace with NULL



72
73
74
# File 'lib/theman/agency.rb', line 72

def nulls(*args)
  @nulls = args
end

#nulls_to_sedObject

:nodoc:



116
117
118
119
120
# File 'lib/theman/agency.rb', line 116

def nulls_to_sed #:nodoc:
  @nulls.map do |regex|
    "-e 's/#{regex.source}//g'"
  end
end

#pipe_it(l = "") ⇒ Object

:nodoc:



164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
# File 'lib/theman/agency.rb', line 164

def pipe_it(l = "") #:nodoc:
  connection.exec psql_command.join("; ")
  f = IO.popen(system_command)
  begin
    while f.read(8192, l)
      connection.put_copy_data l
    end
  rescue EOFError
    f.close
  end
  connection.put_copy_end
  res = connection.get_result
  status_code = res.result_status
  if status_code != 1
    raise Error.new status_code, res.res_status(status_code), res.result_error_message
  end
end

#psql_command(psql = []) ⇒ Object

:nodoc:



99
100
101
102
103
# File 'lib/theman/agency.rb', line 99

def psql_command(psql = []) #:nodoc:
  psql << "SET DATESTYLE TO #{@datestyle}" unless @datestyle.nil?
  psql << psql_copy.join(" ")
  psql
end

#psql_copy(psql = []) ⇒ Object

:nodoc:



91
92
93
94
95
96
97
# File 'lib/theman/agency.rb', line 91

def psql_copy(psql = []) #:nodoc:
  psql << "COPY #{table_name} FROM STDIN WITH"
  psql << "DELIMITER '#{@delimiter}'" unless @delimiter.nil?
  psql << "CSV"
  psql << "HEADER" unless @options[:headers] == false
  psql
end

#sed_command(sed = []) ⇒ Object

:nodoc:



105
106
107
108
109
110
# File 'lib/theman/agency.rb', line 105

def sed_command(sed = []) #:nodoc:
  sed << nulls_to_sed unless @nulls.nil?
  sed << @seds unless @seds.nil?
  sed << chop_to_sed unless @chop.nil?
  sed
end

#seds(*args) ⇒ Object

custom seds to parse stream with



77
78
79
# File 'lib/theman/agency.rb', line 77

def seds(*args)
  @seds = args
end

#stream(arg) ⇒ Object

the location of the data to be sent to Postgres via STDIN (requires a header row)



62
63
64
# File 'lib/theman/agency.rb', line 62

def stream(arg)
  @stream = arg
end

#system_commandObject

:nodoc:



156
157
158
159
160
161
162
# File 'lib/theman/agency.rb', line 156

def system_command #:nodoc:
  unless sed_command.empty?
    "cat #{@stream} | sed #{sed_command.join(" | sed ")}" 
  else
    "cat #{@stream}"
  end
end

#table {|@columns| ... } ⇒ Object

create default columns from stream and replace selected columns with custom data types from block

Yields:



56
57
58
59
# File 'lib/theman/agency.rb', line 56

def table(&block)
  create_stream_columns unless @options[:headers] == false
  yield @columns
end

#transaction(&block) ⇒ Object

create a transaction block for use with :on_commit => :drop



37
38
39
40
41
# File 'lib/theman/agency.rb', line 37

def transaction(&block)
  connection.exec "BEGIN;"
  yield
  connection.exec "COMMIT;"
end