Class: Taps::ClientSession

Inherits:
Object
  • Object
show all
Defined in:
lib/taps/client_session.rb

Defined Under Namespace

Classes: CorruptedData

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database_url, remote_url, default_chunksize) ⇒ ClientSession

Returns a new instance of ClientSession.



13
14
15
16
17
# File 'lib/taps/client_session.rb', line 13

def initialize(database_url, remote_url, default_chunksize)
	@database_url = database_url
	@remote_url = remote_url
	@default_chunksize = default_chunksize
end

Instance Attribute Details

#database_urlObject (readonly)

Returns the value of attribute database_url.



11
12
13
# File 'lib/taps/client_session.rb', line 11

def database_url
  @database_url
end

#default_chunksizeObject (readonly)

Returns the value of attribute default_chunksize.



11
12
13
# File 'lib/taps/client_session.rb', line 11

def default_chunksize
  @default_chunksize
end

#remote_urlObject (readonly)

Returns the value of attribute remote_url.



11
12
13
# File 'lib/taps/client_session.rb', line 11

def remote_url
  @remote_url
end

Class Method Details

.quickstart(&block) ⇒ Object



25
26
27
28
29
# File 'lib/taps/client_session.rb', line 25

def self.quickstart(&block)
	start(Taps::Config.database_url, Taps::Config.remote_url, Taps::Config.chunksize) do |s|
		yield s
	end
end

.start(database_url, remote_url, default_chunksize) {|s| ... } ⇒ Object

Yields:

  • (s)


19
20
21
22
23
# File 'lib/taps/client_session.rb', line 19

def self.start(database_url, remote_url, default_chunksize, &block)
	s = new(database_url, remote_url, default_chunksize)
	yield s
	s.close_session
end

Instance Method Details

#close_sessionObject



52
53
54
# File 'lib/taps/client_session.rb', line 52

def close_session
	@session_resource.delete(http_headers) if @session_resource
end

#cmd_receiveObject



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/taps/client_session.rb', line 172

def cmd_receive
	begin
		verify_server
		cmd_receive_schema
		cmd_receive_data
		cmd_receive_indexes
		cmd_reset_sequences
	rescue RestClient::Exception => e
		if e.respond_to?(:response)
			puts "!!! Caught Server Exception"
			puts "#{e.response.body}"
			exit(1)
		else
			raise
		end
	end
end

#cmd_receive_dataObject



190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
# File 'lib/taps/client_session.rb', line 190

def cmd_receive_data
	puts "Receiving data"

	tables_with_counts, record_count = fetch_remote_tables_info

	puts "#{tables_with_counts.size} tables, #{format_number(record_count)} records"

	tables_with_counts.each do |table_name, count|
		table = db[table_name.to_sym]
		chunksize = default_chunksize

		progress = ProgressBar.new(table_name.to_s, count)

		offset = 0
		loop do
			begin
				chunksize, rows = fetch_table_rows(table_name, chunksize, offset)
			rescue CorruptedData
				next
			end
			break if rows == { }

			table.import(rows[:header], rows[:data])

			progress.inc(rows[:data].size)
			offset += rows[:data].size
		end

		progress.finish
	end
end

#cmd_receive_indexesObject



265
266
267
268
269
270
271
272
# File 'lib/taps/client_session.rb', line 265

def cmd_receive_indexes
	puts "Receiving indexes"

	index_data = session_resource['indexes'].get(http_headers)

	output = Taps::Utils.load_indexes(database_url, index_data)
	puts output if output
end

#cmd_receive_schemaObject



257
258
259
260
261
262
263
# File 'lib/taps/client_session.rb', line 257

def cmd_receive_schema
	puts "Receiving schema"

	schema_data = session_resource['schema'].get(http_headers)
	output = Taps::Utils.load_schema(database_url, schema_data)
	puts output if output
end

#cmd_reset_sequencesObject



274
275
276
277
278
279
# File 'lib/taps/client_session.rb', line 274

def cmd_reset_sequences
	puts "Resetting sequences"

	output = Taps::Utils.schema_bin(:reset_db_sequences, database_url)
	puts output if output
end

#cmd_sendObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/taps/client_session.rb', line 72

def cmd_send
	begin
		verify_server
		cmd_send_schema
		cmd_send_data
		cmd_send_indexes
		cmd_send_reset_sequences
	rescue RestClient::Exception => e
		if e.respond_to?(:response)
			puts "!!! Caught Server Exception"
			puts "#{e.response.body}"
			exit(1)
		else
			raise
		end
	end
end

#cmd_send_dataObject



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/taps/client_session.rb', line 110

def cmd_send_data
	puts "Sending data"

	tables_with_counts, record_count = fetch_tables_info

	puts "#{tables_with_counts.size} tables, #{format_number(record_count)} records"


	db.tables.each do |table_name|
		table = db[table_name]
		count = table.count
		order = Taps::Utils.order_by(db, table_name)
		chunksize = self.default_chunksize
		string_columns = Taps::Utils.incorrect_blobs(db, table_name)

		progress = ProgressBar.new(table_name.to_s, count)

		offset = 0
		loop do
			row_size = 0
			chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
				rows = Taps::Utils.format_data(table.order(*order).limit(c, offset).all, string_columns)
				break if rows == { }

				row_size = rows[:data].size
				gzip_data = Taps::Utils.gzip(Marshal.dump(rows))

				begin
					session_resource["tables/#{table_name}"].post(gzip_data, http_headers({
						:content_type => 'application/octet-stream',
						:taps_checksum => Taps::Utils.checksum(gzip_data).to_s}))
				rescue RestClient::RequestFailed => e
					# retry the same data, it got corrupted somehow.
					if e.http_code == 412
						next
					end
					raise
				end
			end

			progress.inc(row_size)
			offset += row_size

			break if row_size == 0
		end

		progress.finish
	end
end

#cmd_send_indexesObject



90
91
92
93
94
95
# File 'lib/taps/client_session.rb', line 90

def cmd_send_indexes
	puts "Sending indexes"

	index_data = Taps::Utils.schema_bin(:indexes, database_url)
	session_resource['indexes'].post(index_data, http_headers)
end

#cmd_send_reset_sequencesObject



104
105
106
107
108
# File 'lib/taps/client_session.rb', line 104

def cmd_send_reset_sequences
	puts "Resetting sequences"

	session_resource["reset_sequences"].post('', http_headers)
end

#cmd_send_schemaObject



97
98
99
100
101
102
# File 'lib/taps/client_session.rb', line 97

def cmd_send_schema
	puts "Sending schema"

	schema_data = Taps::Utils.schema_bin(:dump, database_url)
	session_resource['schema'].post(schema_data, http_headers)
end

#dbObject



31
32
33
# File 'lib/taps/client_session.rb', line 31

def db
	@db ||= Sequel.connect(database_url)
end

#fetch_remote_tables_infoObject



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
# File 'lib/taps/client_session.rb', line 241

def fetch_remote_tables_info
	retries = 0
	max_retries = 1
	begin
		tables_with_counts = Marshal.load(session_resource['tables'].get(http_headers))
		record_count = tables_with_counts.values.inject(0) { |a,c| a += c }
	rescue RestClient::Exception
		retries += 1
		retry if retries <= max_retries
		puts "Unable to fetch tables information from #{remote_url}. Please check the server log."
		exit(1)
	end

	[ tables_with_counts, record_count ]
end

#fetch_table_rows(table_name, chunksize, offset) ⇒ Object

Raises:



224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# File 'lib/taps/client_session.rb', line 224

def fetch_table_rows(table_name, chunksize, offset)
	response = nil
	chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
		response = session_resource["tables/#{table_name}/#{c}?offset=#{offset}"].get(http_headers)
	end
	raise CorruptedData unless Taps::Utils.valid_data?(response.to_s, response.headers[:taps_checksum])

	begin
		rows = Marshal.load(Taps::Utils.gunzip(response.to_s))
	rescue Object => e
		puts "Error encountered loading data, wrote the data chunk to dump.#{Process.pid}.gz"
		File.open("dump.#{Process.pid}.gz", "w") { |f| f.write(response.to_s) }
		raise
	end
	[chunksize, rows]
end

#fetch_tables_infoObject



160
161
162
163
164
165
166
167
168
169
170
# File 'lib/taps/client_session.rb', line 160

def fetch_tables_info
	record_count = 0
	tables = db.tables
	tables_with_counts = tables.inject({}) do |accum, table|
		accum[table] = db[table].count
		record_count += accum[table]
		accum
	end

	[ tables_with_counts, record_count ]
end

#format_number(num) ⇒ Object



281
282
283
# File 'lib/taps/client_session.rb', line 281

def format_number(num)
	num.to_s.gsub(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1,")
end

#http_headers(extra = {}) ⇒ Object



68
69
70
# File 'lib/taps/client_session.rb', line 68

def http_headers(extra = {})
	{ :taps_version => Taps.compatible_version }.merge(extra)
end

#open_sessionObject



43
44
45
46
# File 'lib/taps/client_session.rb', line 43

def open_session
	uri = server['sessions'].post('', http_headers)
	server[uri]
end

#safe_database_urlObject



64
65
66
# File 'lib/taps/client_session.rb', line 64

def safe_database_url
	safe_url(database_url)
end

#safe_remote_urlObject



60
61
62
# File 'lib/taps/client_session.rb', line 60

def safe_remote_url
	safe_url(remote_url)
end

#safe_url(url) ⇒ Object



56
57
58
# File 'lib/taps/client_session.rb', line 56

def safe_url(url)
	url.sub(/\/\/(.+?)?:(.*?)@/, '//\1:[hidden]@')
end

#serverObject



35
36
37
# File 'lib/taps/client_session.rb', line 35

def server
	@server ||= RestClient::Resource.new(remote_url)
end

#session_resourceObject



39
40
41
# File 'lib/taps/client_session.rb', line 39

def session_resource
	@session_resource ||= open_session
end

#set_session(uri) ⇒ Object



48
49
50
# File 'lib/taps/client_session.rb', line 48

def set_session(uri)
	@session_resource = server[uri]
end

#verify_serverObject



285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
# File 'lib/taps/client_session.rb', line 285

def verify_server
	begin
		server['/'].get(http_headers)
	rescue RestClient::RequestFailed => e
		if e.http_code == 417
			puts "#{safe_remote_url} is running a different minor version of taps."
			puts "#{e.response.body}"
			exit(1)
		else
			raise
		end
	rescue RestClient::Unauthorized
		puts "Bad credentials given for #{safe_remote_url}"
		exit(1)
	rescue Errno::ECONNREFUSED
		puts "Can't connect to #{safe_remote_url}. Please check that it's running"
		exit(1)
	end
end