Class: Taps::DataStream

Inherits:
Object
  • Object
show all
Defined in:
lib/taps/data_stream.rb

Direct Known Subclasses

DataStreamKeyed

Constant Summary collapse

DEFAULT_CHUNKSIZE =
1000

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db, state) ⇒ DataStream

Returns a new instance of DataStream.



14
15
16
17
18
19
20
21
22
23
24
# File 'lib/taps/data_stream.rb', line 14

def initialize(db, state)
  @db = db
  @state = {
    offset: 0,
    avg_chunksize: 0,
    num_chunksize: 0,
    total_chunksize: 0
  }.merge(state)
  @state[:chunksize] ||= DEFAULT_CHUNKSIZE
  @complete = false
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



12
13
14
# File 'lib/taps/data_stream.rb', line 12

def db
  @db
end

#stateObject (readonly)

Returns the value of attribute state.



12
13
14
# File 'lib/taps/data_stream.rb', line 12

def state
  @state
end

Class Method Details

.factory(db, state) ⇒ Object



230
231
232
233
234
235
236
237
238
239
240
241
242
# File 'lib/taps/data_stream.rb', line 230

def self.factory(db, state)
  if defined?(Sequel::MySQL) && Sequel::MySQL.respond_to?(:convert_invalid_date_time=)
    Sequel::MySQL.convert_invalid_date_time = :nil
  end

  return eval(state[:klass]).new(db, state) if state.key?(:klass)

  if Taps::Utils.single_integer_primary_key(db, state[:table_name].to_sym)
    DataStreamKeyed.new(db, state)
  else
    DataStream.new(db, state)
  end
end

.parse_json(json) ⇒ Object



184
185
186
187
188
# File 'lib/taps/data_stream.rb', line 184

def self.parse_json(json)
  hash = ::OkJson.decode(json).symbolize_keys
  hash[:state].symbolize_keys! if hash.key?(:state)
  hash
end

Instance Method Details

#complete?Boolean

Returns:

  • (Boolean)


128
129
130
# File 'lib/taps/data_stream.rb', line 128

def complete?
  @complete
end

#encode_rows(rows) ⇒ Object



110
111
112
# File 'lib/taps/data_stream.rb', line 110

def encode_rows(rows)
  Taps::Utils.base64encode(Marshal.dump(rows))
end

#errorObject



34
35
36
# File 'lib/taps/data_stream.rb', line 34

def error
  state[:error] || false
end

#error=(val) ⇒ Object



30
31
32
# File 'lib/taps/data_stream.rb', line 30

def error=(val)
  state[:error] = val
end

#fetchObject



114
115
116
117
118
119
120
121
122
123
124
125
126
# File 'lib/taps/data_stream.rb', line 114

def fetch
  log.debug "DataStream#fetch state -> #{state.inspect}"

  t1 = Time.now
  rows = fetch_rows
  encoded_data = encode_rows(rows)
  t2 = Time.now
  elapsed_time = t2 - t1

  @complete = rows == {}

  [encoded_data, (@complete ? 0 : rows[:data].size), elapsed_time]
end

#fetch_chunksizeObject



91
92
93
94
95
96
97
# File 'lib/taps/data_stream.rb', line 91

def fetch_chunksize
  chunksize = state[:chunksize]
  return chunksize if state[:num_chunksize] < max_chunksize_training
  return chunksize if state[:avg_chunksize] == 0
  return chunksize if state[:error]
  state[:avg_chunksize] > chunksize ? state[:avg_chunksize] : chunksize
end

#fetch_from_resource(resource, headers) ⇒ Object



167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/taps/data_stream.rb', line 167

def fetch_from_resource(resource, headers)
  res = nil
  log.debug "DataStream#fetch_from_resource state -> #{state.inspect}"
  state[:chunksize] = Taps::Utils.calculate_chunksize(state[:chunksize]) do |c|
    state[:chunksize] = c.to_i
    res = resource.post({ state: ::OkJson.encode(to_hash) }, headers)
  end

  begin
    params = Taps::Multipart.parse(res)
    params[:json] = self.class.parse_json(params[:json]) if params.key?(:json)
    return params
  rescue ::OkJson::ParserError
    raise Taps::CorruptedData, 'Invalid OkJson Received'
  end
end

#fetch_remote(resource, headers) ⇒ Object



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/taps/data_stream.rb', line 132

def fetch_remote(resource, headers)
  params = fetch_from_resource(resource, headers)
  encoded_data = params[:encoded_data]
  json = params[:json]

  rows = parse_encoded_data(encoded_data, json[:checksum])
  @complete = rows == {}

  # update local state
  state.merge!(json[:state].merge(chunksize: state[:chunksize]))

  if @complete
    0
  else
    import_rows(rows)
    rows[:data].size
  end
end

#fetch_remote_in_server(params) ⇒ Object

this one is used inside the server process



152
153
154
155
156
157
158
159
160
161
162
163
164
165
# File 'lib/taps/data_stream.rb', line 152

def fetch_remote_in_server(params)
  json = self.class.parse_json(params[:json])
  encoded_data = params[:encoded_data]

  rows = parse_encoded_data(encoded_data, json[:checksum])
  @complete = rows == {}

  if @complete
    0
  else
    import_rows(rows)
    rows[:data].size
  end
end

#fetch_rowsObject

keep a record of the average chunksize within the first few hundred thousand records, after chunksize goes below 100 or maybe if offset is > 1000



75
76
77
78
79
80
81
82
83
84
85
# File 'lib/taps/data_stream.rb', line 75

def fetch_rows
  state[:chunksize] = fetch_chunksize
  ds = table.order(*order_by).limit(state[:chunksize], state[:offset])
  log.debug "DataStream#fetch_rows SQL -> #{ds.sql}"
  rows = Taps::Utils.format_data(ds.all,
                                 string_columns: string_columns,
                                 schema: db.schema(table_name),
                                 table: table_name)
  update_chunksize_stats
  rows
end

#import_rows(rows) ⇒ Object



204
205
206
207
208
209
210
211
212
213
214
215
216
217
# File 'lib/taps/data_stream.rb', line 204

def import_rows(rows)
  table.import(rows[:header], rows[:data])
  state[:offset] += rows[:data].size
rescue Exception => ex
  case ex.message
  when /integer out of range/ then
    raise Taps::InvalidData, <<-ERROR, []
\nDetected integer data that exceeds the maximum allowable size for an integer type.
This generally occurs when importing from SQLite due to the fact that SQLite does
not enforce maximum values on integer types.
ERROR
  else raise ex
  end
end

#increment(row_count) ⇒ Object



69
70
71
# File 'lib/taps/data_stream.rb', line 69

def increment(row_count)
  state[:offset] += row_count
end

#logObject



26
27
28
# File 'lib/taps/data_stream.rb', line 26

def log
  Taps.log
end

#max_chunksize_trainingObject



87
88
89
# File 'lib/taps/data_stream.rb', line 87

def max_chunksize_training
  20
end

#order_by(name = nil) ⇒ Object



62
63
64
65
66
67
# File 'lib/taps/data_stream.rb', line 62

def order_by(name = nil)
  @order_by ||= begin
    name ||= table_name
    Taps::Utils.order_by(db, name)
  end
end

#parse_encoded_data(encoded_data, checksum) ⇒ Object



190
191
192
193
194
195
196
197
198
199
200
201
202
# File 'lib/taps/data_stream.rb', line 190

def parse_encoded_data(encoded_data, checksum)
  raise Taps::CorruptedData, 'Checksum Failed' unless Taps::Utils.valid_data?(encoded_data, checksum)

  begin
    return Marshal.load(Taps::Utils.base64decode(encoded_data))
  rescue Object
    unless ENV['NO_DUMP_MARSHAL_ERRORS']
      puts "Error encountered loading data, wrote the data chunk to dump.#{Process.pid}.dat"
      File.open("dump.#{Process.pid}.dat", 'w') { |f| f.write(encoded_data) }
    end
    raise
  end
end

#string_columnsObject



54
55
56
# File 'lib/taps/data_stream.rb', line 54

def string_columns
  @string_columns ||= Taps::Utils.incorrect_blobs(db, table_name)
end

#tableObject



58
59
60
# File 'lib/taps/data_stream.rb', line 58

def table
  @table ||= db[table_name_sql]
end

#table_nameObject



38
39
40
# File 'lib/taps/data_stream.rb', line 38

def table_name
  state[:table_name].to_sym
end

#table_name_sqlObject



42
43
44
# File 'lib/taps/data_stream.rb', line 42

def table_name_sql
  table_name.identifier
end

#to_hashObject



46
47
48
# File 'lib/taps/data_stream.rb', line 46

def to_hash
  state.merge(klass: self.class.to_s)
end

#to_jsonObject



50
51
52
# File 'lib/taps/data_stream.rb', line 50

def to_json
  ::OkJson.encode(to_hash)
end

#update_chunksize_statsObject



99
100
101
102
103
104
105
106
107
108
# File 'lib/taps/data_stream.rb', line 99

def update_chunksize_stats
  return if state[:num_chunksize] >= max_chunksize_training
  state[:total_chunksize] += state[:chunksize]
  state[:num_chunksize] += 1
  state[:avg_chunksize] = begin
                            state[:total_chunksize] / state[:num_chunksize]
                          rescue
                            state[:chunksize]
                          end
end

#verify_remote_stream(resource, headers) ⇒ Object



223
224
225
226
227
228
# File 'lib/taps/data_stream.rb', line 223

def verify_remote_stream(resource, headers)
  json_raw = resource.post({ state: ::OkJson.encode(self) }, headers).to_s
  json = self.class.parse_json(json_raw)

  self.class.new(db, json[:state])
end

#verify_streamObject



219
220
221
# File 'lib/taps/data_stream.rb', line 219

def verify_stream
  state[:offset] = table.count
end