Class: GHArchive::OnlineProvider
Defined Under Namespace
Classes: Cache, DownloadArchiveException
Instance Method Summary
collapse
Methods inherited from Provider
#exclude, #include, #logger=, #parse_events, #restore_checkpoint, #update_checkpoint, #use_checkpoint
Methods included from Utils
#each_time, #get_gha_filename, #read_gha_file, #read_gha_file_content
Constructor Details
#initialize(max_retries = 3, proactive = false, proactive_pool_size = 10) ⇒ OnlineProvider
Returns a new instance of OnlineProvider.
142
143
144
145
146
147
148
149
|
# File 'lib/gh-archive/providers.rb', line 142
def initialize(max_retries = 3, proactive = false, proactive_pool_size = 10)
super()
self.max_retries(max_retries)
self.proactive(proactive_pool_size) if proactive
@cache = Cache.new
end
|
Instance Method Details
#cache(current_time) ⇒ Object
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
|
# File 'lib/gh-archive/providers.rb', line 204
def cache(current_time)
@logger.info("Full cache. Waiting for some free slot...") if @cache.full?
while @cache.full?
sleep 1
end
filename = self.get_gha_filename(current_time)
@max_retries.times do
begin
URI.open("http://data.gharchive.org/#{filename}") do |gz|
content = self.read_gha_file(gz)
@cache.put(filename, content)
return
end
rescue Errno::ECONNRESET => e
@logger.warn("A server error temporary prevented the download of #{current_time}: " + e.message)
next
rescue OpenURI::HTTPError => e
code = e.io.status[0]
if code.start_with?("5")
@logger.warn("A server error temporary prevented the download of #{current_time}: " + e.message)
next
elsif code == "404"
@logger.error("File for #{current_time} not found. Skipping because: " + e.message)
else
raise e
end
rescue Zlib::GzipFile::Error => e
@logger.warn("Could not unzip, cache and analyze the zip at #{current_time}: " + e.message)
end
end
@cache.put(filename, nil) unless @cache.has?(filename)
end
|
#each(from = Time.gm(2015, 1, 1), to = Time.now) ⇒ Object
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
|
# File 'lib/gh-archive/providers.rb', line 239
def each(from = Time.gm(2015, 1, 1), to = Time.now)
if @proactive
real_from = restore_checkpoint(from)
any_ready = Thread.promise
@logger.info("Proactively scheduling download tasks...")
self.each_time(real_from, to) do |current_time|
@pool.process(current_time) do |current_time|
cache(current_time)
any_ready << true
@logger.info("Proactively cached #{current_time}. Cache size: #{@cache.size}")
end
end
~any_ready
@logger.info("Download tasks successfully scheduled!")
end
super
end
|
#get(current_time) ⇒ Object
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
|
# File 'lib/gh-archive/providers.rb', line 164
def get(current_time)
@max_retries.times do
begin
filename = self.get_gha_filename(current_time)
if @proactive
@logger.info("Waiting for cache to have #{current_time}...") unless @cache.has?(filename)
while !@cache.has?(filename)
sleep 1
end
data = @cache.get(filename)
if data
return data
else
raise DownloadArchiveException, "Could not scan #{filename}: data unavailable."
end
else
URI.open("http://data.gharchive.org/#{filename}") do |gz|
return self.read_gha_file(gz)
end
end
rescue Errno::ECONNRESET => e
@logger.warn("A server error temporary prevented the download of #{current_time}: " + e.message)
next
rescue OpenURI::HTTPError => e
code = e.io.status[0]
if code.start_with?("5")
@logger.warn("A server error temporary prevented the download of #{current_time}: " + e.message)
next
else
raise e
end
end
end
raise DownloadArchiveException, "Exceeded maximum number of tentative downloads for #{current_time}."
end
|
#max_retries(n) ⇒ Object
151
152
153
154
155
|
# File 'lib/gh-archive/providers.rb', line 151
def max_retries(n)
@max_retries = n
return self
end
|
#proactive(pool_size = 10) ⇒ Object
157
158
159
160
161
162
|
# File 'lib/gh-archive/providers.rb', line 157
def proactive(pool_size = 10)
@proactive = true
@pool = GHArchive::ThreadPool.new(pool_size)
return self
end
|