Class: Taps::ClientSession

Inherits:
Object
  • Object
show all
Defined in:
lib/taps/client_session.rb

Defined Under Namespace

Classes: CorruptedData

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(database_url, remote_url, default_chunksize) ⇒ ClientSession

Returns a new instance of ClientSession.



14
15
16
17
18
# File 'lib/taps/client_session.rb', line 14

def initialize(database_url, remote_url, default_chunksize)
	@database_url = database_url
	@remote_url = remote_url
	@default_chunksize = default_chunksize
end

Instance Attribute Details

#database_urlObject (readonly)

Returns the value of attribute database_url.



12
13
14
# File 'lib/taps/client_session.rb', line 12

def database_url
  @database_url
end

#default_chunksizeObject (readonly)

Returns the value of attribute default_chunksize.



12
13
14
# File 'lib/taps/client_session.rb', line 12

def default_chunksize
  @default_chunksize
end

#remote_urlObject (readonly)

Returns the value of attribute remote_url.



12
13
14
# File 'lib/taps/client_session.rb', line 12

def remote_url
  @remote_url
end

Class Method Details

.quickstart(&block) ⇒ Object



26
27
28
29
30
# File 'lib/taps/client_session.rb', line 26

def self.quickstart(&block)
	start(Taps::Config.database_url, Taps::Config.remote_url, Taps::Config.chunksize) do |s|
		yield s
	end
end

.start(database_url, remote_url, default_chunksize) {|s| ... } ⇒ Object

Yields:

  • (s)


20
21
22
23
24
# File 'lib/taps/client_session.rb', line 20

def self.start(database_url, remote_url, default_chunksize, &block)
	s = new(database_url, remote_url, default_chunksize)
	yield s
	s.close_session
end

Instance Method Details

#close_sessionObject



53
54
55
# File 'lib/taps/client_session.rb', line 53

def close_session
	@session_resource.delete(http_headers) if @session_resource
end

#cmd_receiveObject



173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/taps/client_session.rb', line 173

def cmd_receive
	begin
		verify_server
		cmd_receive_schema
		cmd_receive_data
		cmd_receive_indexes
		cmd_reset_sequences
	rescue RestClient::Exception => e
		if e.respond_to?(:response)
			puts "!!! Caught Server Exception"
			puts "#{e.response.body}"
			exit(1)
		else
			raise
		end
	end
end

#cmd_receive_dataObject



191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/taps/client_session.rb', line 191

def cmd_receive_data
	puts "Receiving data"

	tables_with_counts, record_count = fetch_remote_tables_info

	puts "#{tables_with_counts.size} tables, #{format_number(record_count)} records"

	tables_with_counts.each do |table_name, count|
		table = db[table_name.to_sym]
		chunksize = default_chunksize

		progress = ProgressBar.new(table_name.to_s, count)

		offset = 0
		loop do
			begin
				chunksize, rows = fetch_table_rows(table_name, chunksize, offset)
			rescue CorruptedData
				next
			end
			break if rows == { }

			table.import(rows[:header], rows[:data])

			progress.inc(rows[:data].size)
			offset += rows[:data].size
		end

		progress.finish
	end
end

#cmd_receive_indexesObject



266
267
268
269
270
271
272
273
# File 'lib/taps/client_session.rb', line 266

def cmd_receive_indexes
	puts "Receiving indexes"

	index_data = session_resource['indexes'].get(http_headers)

	output = Taps::Schema.load_indexes(database_url, index_data)
	puts output if output
end

#cmd_receive_schemaObject



258
259
260
261
262
263
264
# File 'lib/taps/client_session.rb', line 258

def cmd_receive_schema
	puts "Receiving schema"

	schema_data = session_resource['schema'].get(http_headers)
	output = Taps::Schema.load(database_url, schema_data)
	puts output if output
end

#cmd_reset_sequencesObject



275
276
277
278
279
280
# File 'lib/taps/client_session.rb', line 275

def cmd_reset_sequences
	puts "Resetting sequences"

	output = Taps::Schema.reset_db_sequences(database_url)
	puts output if output
end

#cmd_sendObject



73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/taps/client_session.rb', line 73

def cmd_send
	begin
		verify_server
		cmd_send_schema
		cmd_send_data
		cmd_send_indexes
		cmd_send_reset_sequences
	rescue RestClient::Exception => e
		if e.respond_to?(:response)
			puts "!!! Caught Server Exception"
			puts "#{e.response.body}"
			exit(1)
		else
			raise
		end
	end
end

#cmd_send_dataObject



111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# File 'lib/taps/client_session.rb', line 111

def cmd_send_data
	puts "Sending data"

	tables_with_counts, record_count = fetch_tables_info

	puts "#{tables_with_counts.size} tables, #{format_number(record_count)} records"


	db.tables.each do |table_name|
		table = db[table_name]
		count = table.count
		order = Taps::Utils.order_by(db, table_name)
		chunksize = self.default_chunksize
		string_columns = Taps::Utils.incorrect_blobs(db, table_name)

		progress = ProgressBar.new(table_name.to_s, count)

		offset = 0
		loop do
			row_size = 0
			chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
				rows = Taps::Utils.format_data(table.order(*order).limit(c, offset).all, string_columns)
				break if rows == { }

				row_size = rows[:data].size
				gzip_data = Taps::Utils.gzip(Marshal.dump(rows))

				begin
					session_resource["tables/#{table_name}"].post(gzip_data, http_headers({
						:content_type => 'application/octet-stream',
						:taps_checksum => Taps::Utils.checksum(gzip_data).to_s}))
				rescue RestClient::RequestFailed => e
					# retry the same data, it got corrupted somehow.
					if e.http_code == 412
						next
					end
					raise
				end
			end

			progress.inc(row_size)
			offset += row_size

			break if row_size == 0
		end

		progress.finish
	end
end

#cmd_send_indexesObject



91
92
93
94
95
96
# File 'lib/taps/client_session.rb', line 91

def cmd_send_indexes
	puts "Sending indexes"

	index_data = Taps::Schema.indexes(database_url)
	session_resource['indexes'].post(index_data, http_headers)
end

#cmd_send_reset_sequencesObject



105
106
107
108
109
# File 'lib/taps/client_session.rb', line 105

def cmd_send_reset_sequences
	puts "Resetting sequences"

	session_resource["reset_sequences"].post('', http_headers)
end

#cmd_send_schemaObject



98
99
100
101
102
103
# File 'lib/taps/client_session.rb', line 98

def cmd_send_schema
	puts "Sending schema"

	schema_data = Taps::Schema.dump_without_indexes(database_url)
	session_resource['schema'].post(schema_data, http_headers)
end

#dbObject



32
33
34
# File 'lib/taps/client_session.rb', line 32

def db
	@db ||= Sequel.connect(database_url)
end

#fetch_remote_tables_infoObject



242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
# File 'lib/taps/client_session.rb', line 242

def fetch_remote_tables_info
	retries = 0
	max_retries = 1
	begin
		tables_with_counts = Marshal.load(session_resource['tables'].get(http_headers))
		record_count = tables_with_counts.values.inject(0) { |a,c| a += c }
	rescue RestClient::Exception
		retries += 1
		retry if retries <= max_retries
		puts "Unable to fetch tables information from #{remote_url}. Please check the server log."
		exit(1)
	end

	[ tables_with_counts, record_count ]
end

#fetch_table_rows(table_name, chunksize, offset) ⇒ Object

Raises:



225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
# File 'lib/taps/client_session.rb', line 225

def fetch_table_rows(table_name, chunksize, offset)
	response = nil
	chunksize = Taps::Utils.calculate_chunksize(chunksize) do |c|
		response = session_resource["tables/#{table_name}/#{c}?offset=#{offset}"].get(http_headers)
	end
	raise CorruptedData unless Taps::Utils.valid_data?(response.to_s, response.headers[:taps_checksum])

	begin
		rows = Marshal.load(Taps::Utils.gunzip(response.to_s))
	rescue Object => e
		puts "Error encountered loading data, wrote the data chunk to dump.#{Process.pid}.gz"
		File.open("dump.#{Process.pid}.gz", "w") { |f| f.write(response.to_s) }
		raise
	end
	[chunksize, rows]
end

#fetch_tables_infoObject



161
162
163
164
165
166
167
168
169
170
171
# File 'lib/taps/client_session.rb', line 161

def fetch_tables_info
	record_count = 0
	tables = db.tables
	tables_with_counts = tables.inject({}) do |accum, table|
		accum[table] = db[table].count
		record_count += accum[table]
		accum
	end

	[ tables_with_counts, record_count ]
end

#format_number(num) ⇒ Object



282
283
284
# File 'lib/taps/client_session.rb', line 282

def format_number(num)
	num.to_s.gsub(/(\d)(?=(\d\d\d)+(?!\d))/, "\\1,")
end

#http_headers(extra = {}) ⇒ Object



69
70
71
# File 'lib/taps/client_session.rb', line 69

def http_headers(extra = {})
	{ :taps_version => Taps.compatible_version }.merge(extra)
end

#open_sessionObject



44
45
46
47
# File 'lib/taps/client_session.rb', line 44

def open_session
	uri = server['sessions'].post('', http_headers)
	server[uri]
end

#safe_database_urlObject



65
66
67
# File 'lib/taps/client_session.rb', line 65

def safe_database_url
	safe_url(database_url)
end

#safe_remote_urlObject



61
62
63
# File 'lib/taps/client_session.rb', line 61

def safe_remote_url
	safe_url(remote_url)
end

#safe_url(url) ⇒ Object



57
58
59
# File 'lib/taps/client_session.rb', line 57

def safe_url(url)
	url.sub(/\/\/(.+?)?:(.*?)@/, '//\1:[hidden]@')
end

#serverObject



36
37
38
# File 'lib/taps/client_session.rb', line 36

def server
	@server ||= RestClient::Resource.new(remote_url)
end

#session_resourceObject



40
41
42
# File 'lib/taps/client_session.rb', line 40

def session_resource
	@session_resource ||= open_session
end

#set_session(uri) ⇒ Object



49
50
51
# File 'lib/taps/client_session.rb', line 49

def set_session(uri)
	@session_resource = server[uri]
end

#verify_serverObject



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/taps/client_session.rb', line 286

def verify_server
	begin
		server['/'].get(http_headers)
	rescue RestClient::RequestFailed => e
		if e.http_code == 417
			puts "#{safe_remote_url} is running a different minor version of taps."
			puts "#{e.response.body}"
			exit(1)
		else
			raise
		end
	rescue RestClient::Unauthorized
		puts "Bad credentials given for #{safe_remote_url}"
		exit(1)
	rescue Errno::ECONNREFUSED
		puts "Can't connect to #{safe_remote_url}. Please check that it's running"
		exit(1)
	end
end