Class: BBFS::ContentServer::FileCopyClient

Inherits:
Object
  • Object
show all
Defined in:
lib/content_server/queue_copy.rb

Overview

class QueueCopy

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host, port, dynamic_content_data) ⇒ FileCopyClient

Returns a new instance of FileCopyClient.



121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/content_server/queue_copy.rb', line 121

def initialize(host, port, dynamic_content_data)
  @local_queue = Queue.new
  @dynamic_content_data = dynamic_content_data
  @tcp_server = Networking::TCPClient.new(host, port, method(:handle_message))
  @file_receiver = FileReceiver.new(method(:done_copy),
                                    method(:abort_copy),
                                    method(:reset_copy))
  @local_thread = Thread.new do
    loop do
      handle(@local_queue.pop)
    end
  end
  @local_thread.abort_on_exception = true
end

Class Method Details

.destination_filename(folder, sha1) ⇒ Object

Creates destination filename for backup server, input is base folder and sha1. for example: folder:/mnt/hd1/bbbackup, sha1:d0be2dc421be4fcd0172e5afceea3970e2f3d940 dest filename: /mnt/hd1/bbbackup/d0/be/2d/d0be2dc421be4fcd0172e5afceea3970e2f3d940



198
199
200
# File 'lib/content_server/queue_copy.rb', line 198

def self.destination_filename(folder, sha1)
  File.join(folder, sha1[0,2], sha1[2,2], sha1)
end

Instance Method Details

#abort_copy(checksum) ⇒ Object



146
147
148
# File 'lib/content_server/queue_copy.rb', line 146

def abort_copy(checksum)
  handle_message([:ABORT_COPY, checksum])
end

#done_copy(local_file_checksum, local_path) ⇒ Object



154
155
156
# File 'lib/content_server/queue_copy.rb', line 154

def done_copy(local_file_checksum, local_path)
  Log.info("Done copy file: #{local_path}, #{local_file_checksum}")
end

#handle(message) ⇒ Object

This is a function which receives the messages (file or ack) and return answer in case of ack. Note that it is being executed from the class thread only!



165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/content_server/queue_copy.rb', line 165

def handle(message)
  message_type, message_content = message
  if message_type == :SEND_COPY_MESSAGE
    Log.debug1("Requesting file (content data) to copy.")
    Log.debug3("File requested: #{message_content.to_s}")
    bytes_written = @tcp_server.send_obj([:COPY_MESSAGE, message_content])
    Log.debug1("Sending copy message succeeded? bytes_written: #{bytes_written}.")
  elsif message_type == :COPY_CHUNK
    Log.debug1('Chunk received.')
    if @file_receiver.receive_chunk(*message_content)
      file_checksum, offset, file_size, content, content_checksum = message_content
      @tcp_server.send_obj([:COPY_CHUNK_FROM_REMOTE, file_checksum])
    end
  elsif message_type == :ACK_MESSAGE
    checksum, timestamp = message_content
    # Here we should check file existence
    Log.debug1("Returning ack for: #{checksum}, timestamp: #{timestamp}")
    Log.debug1("Ack: #{!@dynamic_content_data.exists?(checksum)}")
    @tcp_server.send_obj([:ACK_MESSAGE, [timestamp,
                                         !@dynamic_content_data.exists?(checksum),
                                         checksum]])
  elsif message_type == :ABORT_COPY
    @tcp_server.send_obj([:ABORT_COPY, message_content])
  elsif message_type == :RESET_RESUME_COPY
    @tcp_server.send_obj([:RESET_RESUME_COPY, message_content])
  else
    Log.error("Unexpected message type: #{message_type}")
  end
end

#handle_message(message) ⇒ Object



158
159
160
161
# File 'lib/content_server/queue_copy.rb', line 158

def handle_message(message)
  Log.debug2('QueueFileReceiver handle message')
  @local_queue.push(message)
end

#request_copy(content_data) ⇒ Object



142
143
144
# File 'lib/content_server/queue_copy.rb', line 142

def request_copy(content_data)
  handle_message([:SEND_COPY_MESSAGE, content_data])
end

#reset_copy(checksum, new_offset) ⇒ Object



150
151
152
# File 'lib/content_server/queue_copy.rb', line 150

def reset_copy(checksum, new_offset)
  handle_message([:RESET_RESUME_COPY, [checksum, new_offset]])
end

#threadsObject



136
137
138
139
140
# File 'lib/content_server/queue_copy.rb', line 136

def threads
  ret = [@local_thread]
  ret << @tcp_server.tcp_thread if @tcp_server != nil
  return ret
end