45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/content_server/queue_copy.rb', line 45
def run()
threads = []
threads << @backup_tcp.tcp_thread if @backup_tcp != nil
threads << Thread.new do
while true do
Log.info 'Waiting on copy files events.'
message_type, message_content = @copy_input_queue.pop
if message_type == :COPY_MESSAGE
Log.info "Copy file event: #{message_content}"
message_content.instances.each { |key, instance|
if !@copy_prepare.key?(instance.checksum) || !@copy_prepare[instance.checksum][1]
@copy_prepare[instance.checksum] = [instance.full_path, false]
Log.info("Sending ack for: #{instance.checksum}")
@backup_tcp.send_obj([:ACK_MESSAGE, [instance.checksum, Time.now.to_i]])
end
}
elsif message_type == :ACK_MESSAGE
timestamp, ack, checksum = message_content
Log.info("Ack (#{ack}) received for: #{checksum}, timestamp: #{timestamp} " \
"now: #{Time.now.to_i}")
if ack && (Time.now.to_i - timestamp < Params['ack_timeout'])
if !@copy_prepare.key?(checksum) || @copy_prepare[checksum][1]
Log.warning("File was aborted, copied, or started copy just now: #{checksum}")
else
path = @copy_prepare[checksum][0]
Log.info "Streaming file: #{checksum} #{path}."
@file_streamer.start_streaming(checksum, path)
@copy_prepare[checksum][1] = true
end
else
Log.debug1("Ack timed out span: #{Time.now.to_i - timestamp} > " \
"timeout: #{Params['ack_timeout']}")
end
elsif message_type == :COPY_CHUNK_FROM_REMOTE
checksum = message_content
@file_streamer.copy_another_chuck(checksum)
elsif message_type == :COPY_CHUNK
file_checksum, offset, file_size, content, content_checksum = message_content
Log.info("Send chunk for file #{file_checksum}, offset: #{offset} " \
"filesize: #{file_size}.")
@backup_tcp.send_obj([:COPY_CHUNK, message_content])
if content.nil? and content_checksum.nil?
@copy_prepare.delete(file_checksum)
end
elsif message_type == :ABORT_COPY
Log.info("Aborting file copy: #{message_content}")
if @copy_prepare.key?(message_content)
Log.info("Aborting: #{@copy_prepare[message_content][0]}")
@copy_prepare.delete(message_content)
end
@file_streamer.abort_streaming(message_content)
elsif message_type == :RESET_RESUME_COPY
file_checksum, new_offset = message_content
Log.info("Resetting/Resuming file (#{file_checksum}) copy to #{new_offset}")
@file_streamer.reset_streaming(file_checksum, new_offset)
else
Log.error("Copy event not supported: #{message_type}")
end end
Log.error("Should not reach here, loop should continue.")
end
end
|