Class: Taps::DataStream

Inherits:
Object
  • Object
show all
Defined in:
lib/taps/data_stream.rb

Direct Known Subclasses

DataStreamKeyed

Defined Under Namespace

Classes: CorruptedData

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(db, state) ⇒ DataStream

Returns a new instance of DataStream.



14
15
16
17
18
19
20
21
22
23
# File 'lib/taps/data_stream.rb', line 14

def initialize(db, state)
	@db = db
	@state = {
		:offset => 0,
		:avg_chunksize => 0,
		:num_chunksize => 0,
		:total_chunksize => 0,
	}.merge(state)
	@complete = false
end

Instance Attribute Details

#dbObject (readonly)

Returns the value of attribute db.



12
13
14
# File 'lib/taps/data_stream.rb', line 12

def db
  @db
end

#stateObject (readonly)

Returns the value of attribute state.



12
13
14
# File 'lib/taps/data_stream.rb', line 12

def state
  @state
end

Class Method Details

.factory(db, state) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/taps/data_stream.rb', line 202

def self.factory(db, state)
	if defined?(Sequel::MySQL) && Sequel::MySQL.respond_to?(:convert_invalid_date_time=)
		Sequel::MySQL.convert_invalid_date_time = :nil
	end

	if state.has_key?(:klass)
		return eval(state[:klass]).new(db, state)
	end

	if Taps::Utils.single_integer_primary_key(db, state[:table_name].to_sym)
		DataStreamKeyed.new(db, state)
	else
		DataStream.new(db, state)
	end
end

.parse_json(json) ⇒ Object



177
178
179
180
181
# File 'lib/taps/data_stream.rb', line 177

def self.parse_json(json)
	hash = JSON.parse(json).symbolize_keys
	hash[:state].symbolize_keys! if hash.has_key?(:state)
	hash
end

Instance Method Details

#complete?Boolean

Returns:

  • (Boolean)


121
122
123
# File 'lib/taps/data_stream.rb', line 121

def complete?
	@complete
end

#encode_rows(rows) ⇒ Object



103
104
105
# File 'lib/taps/data_stream.rb', line 103

def encode_rows(rows)
	Taps::Utils.base64encode(Marshal.dump(rows))
end

#errorObject



33
34
35
# File 'lib/taps/data_stream.rb', line 33

def error
	state[:error] || false
end

#error=(val) ⇒ Object



29
30
31
# File 'lib/taps/data_stream.rb', line 29

def error=(val)
	state[:error] = val
end

#fetchObject



107
108
109
110
111
112
113
114
115
116
117
118
119
# File 'lib/taps/data_stream.rb', line 107

def fetch
	log.debug "DataStream#fetch state -> #{state.inspect}"

	t1 = Time.now
	rows = fetch_rows
	encoded_data = encode_rows(rows)
	t2 = Time.now
	elapsed_time = t2 - t1

	@complete = rows == { }

	[encoded_data, (@complete ? 0 : rows[:data].size), elapsed_time]
end

#fetch_chunksizeObject



88
89
90
91
92
93
94
# File 'lib/taps/data_stream.rb', line 88

def fetch_chunksize
	chunksize = state[:chunksize]
	return chunksize if state[:num_chunksize] < max_chunksize_training
	return chunksize if state[:avg_chunksize] == 0
	return chunksize if state[:error]
	state[:avg_chunksize] > chunksize ? state[:avg_chunksize] : chunksize
end

#fetch_from_resource(resource, headers) ⇒ Object



160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# File 'lib/taps/data_stream.rb', line 160

def fetch_from_resource(resource, headers)
	res = nil
	log.debug "DataStream#fetch_from_resource state -> #{state.inspect}"
	state[:chunksize] = Taps::Utils.calculate_chunksize(state[:chunksize]) do |c|
		state[:chunksize] = c
		res = resource.post({:state => self.to_json}, headers)
	end

	begin
		params = Taps::Multipart.parse(res)
		params[:json] = self.class.parse_json(params[:json]) if params.has_key?(:json)
		return params
	rescue JSON::Parser
		raise DataStream::CorruptedData.new("Invalid JSON Received")
	end
end

#fetch_remote(resource, headers) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/taps/data_stream.rb', line 125

def fetch_remote(resource, headers)
	params = fetch_from_resource(resource, headers)
	encoded_data = params[:encoded_data]
	json = params[:json]

	rows = parse_encoded_data(encoded_data, json[:checksum])
	@complete = rows == { }

	# update local state
	state.merge!(json[:state].merge(:chunksize => state[:chunksize]))

	unless @complete
		import_rows(rows)
		rows[:data].size
	else
		0
	end
end

#fetch_remote_in_server(params) ⇒ Object

this one is used inside the server process



145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/taps/data_stream.rb', line 145

def fetch_remote_in_server(params)
	json = self.class.parse_json(params[:json])
	encoded_data = params[:encoded_data]

	rows = parse_encoded_data(encoded_data, json[:checksum])
	@complete = rows == { }

	unless @complete
		import_rows(rows)
		rows[:data].size
	else
		0
	end
end

#fetch_rowsObject

keep a record of the average chunksize within the first few hundred thousand records, after chunksize goes below 100 or maybe if offset is > 1000



74
75
76
77
78
79
80
81
82
# File 'lib/taps/data_stream.rb', line 74

def fetch_rows
	state[:chunksize] = fetch_chunksize
	ds = table.order(*order_by).limit(state[:chunksize], state[:offset])
	log.debug "DataStream#fetch_rows SQL -> #{ds.sql}"
	rows = Taps::Utils.format_data(ds.all,
		:string_columns => string_columns)
	update_chunksize_stats
	rows
end

#import_rows(rows) ⇒ Object



197
198
199
200
# File 'lib/taps/data_stream.rb', line 197

def import_rows(rows)
	table.import(rows[:header], rows[:data])
	state[:offset] += rows[:data].size
end

#increment(row_count) ⇒ Object



68
69
70
# File 'lib/taps/data_stream.rb', line 68

def increment(row_count)
	state[:offset] += row_count
end

#logObject



25
26
27
# File 'lib/taps/data_stream.rb', line 25

def log
	Taps.log
end

#max_chunksize_trainingObject



84
85
86
# File 'lib/taps/data_stream.rb', line 84

def max_chunksize_training
	20
end

#order_by(name = nil) ⇒ Object



61
62
63
64
65
66
# File 'lib/taps/data_stream.rb', line 61

def order_by(name=nil)
	@order_by ||= begin
		name ||= table_name
		Taps::Utils.order_by(db, name)
	end
end

#parse_encoded_data(encoded_data, checksum) ⇒ Object



183
184
185
186
187
188
189
190
191
192
193
194
195
# File 'lib/taps/data_stream.rb', line 183

def parse_encoded_data(encoded_data, checksum)
	raise DataStream::CorruptedData.new("Checksum Failed") unless Taps::Utils.valid_data?(encoded_data, checksum)

	begin
		return Marshal.load(Taps::Utils.base64decode(encoded_data))
	rescue Object => e
		unless ENV['NO_DUMP_MARSHAL_ERRORS']
			puts "Error encountered loading data, wrote the data chunk to dump.#{Process.pid}.dat"
			File.open("dump.#{Process.pid}.dat", "w") { |f| f.write(encoded_data) }
		end
		raise
	end
end

#string_columnsObject



53
54
55
# File 'lib/taps/data_stream.rb', line 53

def string_columns
	@string_columns ||= Taps::Utils.incorrect_blobs(db, table_name)
end

#tableObject



57
58
59
# File 'lib/taps/data_stream.rb', line 57

def table
	@table ||= db[table_name_sql]
end

#table_nameObject



37
38
39
# File 'lib/taps/data_stream.rb', line 37

def table_name
	state[:table_name].to_sym
end

#table_name_sqlObject



41
42
43
# File 'lib/taps/data_stream.rb', line 41

def table_name_sql
	table_name.identifier
end

#to_hashObject



45
46
47
# File 'lib/taps/data_stream.rb', line 45

def to_hash
	state.merge(:klass => self.class.to_s)
end

#to_jsonObject



49
50
51
# File 'lib/taps/data_stream.rb', line 49

def to_json
	to_hash.to_json
end

#update_chunksize_statsObject



96
97
98
99
100
101
# File 'lib/taps/data_stream.rb', line 96

def update_chunksize_stats
	return if state[:num_chunksize] >= max_chunksize_training
	state[:total_chunksize] += state[:chunksize]
	state[:num_chunksize] += 1
	state[:avg_chunksize] = state[:total_chunksize] / state[:num_chunksize] rescue state[:chunksize]
end