Class: ContentServer::FileCopyClient
- Inherits:
-
Object
- Object
- ContentServer::FileCopyClient
- Defined in:
- lib/content_server/queue_copy.rb
Overview
class QueueCopy
Class Method Summary collapse
-
.destination_filename(folder, sha1) ⇒ Object
Creates destination filename for backup server, input is base folder and sha1.
Instance Method Summary collapse
- #abort_copy(checksum) ⇒ Object
- #done_copy(local_file_checksum, local_path) ⇒ Object
-
#handle(message) ⇒ Object
This is a function which receives the messages (file or ack) and return answer in case of ack.
- #handle_message(message) ⇒ Object
-
#initialize(host, port) ⇒ FileCopyClient
constructor
A new instance of FileCopyClient.
- #request_copy(content_data) ⇒ Object
- #reset_copy(checksum, new_offset) ⇒ Object
- #threads ⇒ Object
Constructor Details
#initialize(host, port) ⇒ FileCopyClient
Returns a new instance of FileCopyClient.
239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 |
# File 'lib/content_server/queue_copy.rb', line 239 def initialize(host, port) @local_queue = Queue.new @tcp_client = Networking::TCPClient.new(host, port, method(:handle_message)) @file_receiver = FileReceiver.new(method(:done_copy), method(:abort_copy), method(:reset_copy)) @local_thread = Thread.new do loop do pop_data = @local_queue.pop $process_vars.set('File Copy Client queue', @local_queue.size) handle(pop_data) end end @local_thread.abort_on_exception = true Log.debug3("initialize FileCopyClient host:%s port:%s", host, port) end |
Class Method Details
.destination_filename(folder, sha1) ⇒ Object
Creates destination filename for backup server, input is base folder and sha1. for example: folder:/mnt/hd1/bbbackup, sha1:d0be2dc421be4fcd0172e5afceea3970e2f3d940 dest filename: /mnt/hd1/bbbackup/d0/be/2d/d0be2dc421be4fcd0172e5afceea3970e2f3d940
322 323 324 |
# File 'lib/content_server/queue_copy.rb', line 322 def self.destination_filename(folder, sha1) File.join(folder, sha1[0,2], sha1[2,2], sha1) end |
Instance Method Details
#abort_copy(checksum) ⇒ Object
266 267 268 |
# File 'lib/content_server/queue_copy.rb', line 266 def abort_copy(checksum) ([:ABORT_COPY, checksum]) end |
#done_copy(local_file_checksum, local_path) ⇒ Object
274 275 276 277 |
# File 'lib/content_server/queue_copy.rb', line 274 def done_copy(local_file_checksum, local_path) $process_vars.inc('num_files_received') Log.debug1("Done copy file: path %s, checksum %s", local_path, local_file_checksum) end |
#handle(message) ⇒ Object
This is a function which receives the messages (file or ack) and return answer in case of ack. Note that it is being executed from the class thread only!
287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 |
# File 'lib/content_server/queue_copy.rb', line 287 def handle() , = Log.debug1("backup copy message: Type %s. message: %s", , ) if == :SEND_COPY_MESSAGE bytes_written = @tcp_client.send_obj([:COPY_MESSAGE, ]) Log.debug2("Sending copy message succeeded? bytes_written: %s.", bytes_written) elsif == :COPY_CHUNK if @file_receiver.receive_chunk(*) file_checksum, offset, file_size, content, content_checksum = @tcp_client.send_obj([:COPY_CHUNK_FROM_REMOTE, file_checksum]) else file_checksum, offset, file_size, content, content_checksum = Log.error("receive_chunk failed for chunk checksum:#{content_checksum}") end elsif == :ACK_MESSAGE checksum, = # check if checksum exists in final destination dest_path = FileReceiver.destination_filename(Params['backup_destination_folder'][0]['path'], checksum) need_to_copy = !File.exists?(dest_path) Log.debug1("Returning ack for content:'%s' timestamp:'%s' Ack:'%s'", checksum, , need_to_copy) @tcp_client.send_obj([:ACK_MESSAGE, [, need_to_copy, checksum]]) elsif == :ABORT_COPY @tcp_client.send_obj([:ABORT_COPY, ]) elsif == :RESET_RESUME_COPY @tcp_client.send_obj([:RESET_RESUME_COPY, ]) else Log.error("Unexpected message type: #{}") end end |
#handle_message(message) ⇒ Object
279 280 281 282 283 |
# File 'lib/content_server/queue_copy.rb', line 279 def () Log.debug3('QueueFileReceiver handle message') @local_queue.push() $process_vars.set('File Copy Client queue', @local_queue.size) end |
#request_copy(content_data) ⇒ Object
262 263 264 |
# File 'lib/content_server/queue_copy.rb', line 262 def request_copy(content_data) ([:SEND_COPY_MESSAGE, content_data]) end |
#reset_copy(checksum, new_offset) ⇒ Object
270 271 272 |
# File 'lib/content_server/queue_copy.rb', line 270 def reset_copy(checksum, new_offset) ([:RESET_RESUME_COPY, [checksum, new_offset]]) end |
#threads ⇒ Object
256 257 258 259 260 |
# File 'lib/content_server/queue_copy.rb', line 256 def threads ret = [@local_thread] ret << @tcp_client.tcp_thread if @tcp_client != nil return ret end |