Class: Datahen::Scraper::BatchParser

Inherits:
Object
  • Object
show all
Defined in:
lib/datahen/scraper/batch_parser.rb

Constant Summary collapse

NOT_FOUND_MSG =
"No more pages to parse found"
NO_DEQUEUE_COUNT_MSG =
"\nWarning: Max page to parse dequeue count is 0, check pages to parse scale\n"
NO_WORKERS_MSG =
"\nWarning: There are no parser workers\n"

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(job_id, config_file, opts = {}) ⇒ BatchParser

Returns a new instance of BatchParser.

Parameters:

  • opts (Hash) (defaults to: {})

    a customizable set of options

Options Hash (opts):

  • :worker_count (Integer) — default: 1

    Parallel worker quantity.

  • :max_garbage (Integer) — default: 5

    Max amount of times the garbage collector can be requested before actually executing.

  • :dequeue_interval (Integer) — default: 3

    Time in seconds to wait between page dequeuing.

  • :dequeue_scale (Numeric) — default: 2

    Scaling factor to used to calculate page dequeue size.

  • :dequeue_timeout (Numeric) — default: 30

    Page dequeue API request timeout in seconds.

  • :client_options (Hash) — default: {}

    Datahen client gem additional options (see Datahen::Client::Base#initialize method).



106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# File 'lib/datahen/scraper/batch_parser.rb', line 106

def initialize(job_id, config_file, opts = {})
  opts = {
    worker_count: 1,
    max_garbage: 5,
    dequeue_interval: 3,
    dequeue_scale: 2,
    dequeue_timeout: 30,
    client_options: {}
  }.merge opts

  @job_id = job_id
  @worker_count = opts[:worker_count]
  @dequeue_interval = opts[:dequeue_interval]
  @dequeue_scale = opts[:dequeue_scale]
  @max_garbage = opts[:max_garbage]
  @pages = Concurrent::Array.new
  @loaded_pages = Concurrent::Hash.new
  @garbage_mutex = Mutex.new
  @dequeue_mutex = Mutex.new
  @not_found = false
  self.dequeue_timeout = opts[:dequeue_timeout]
  self.second_dequeue_count = 0
  self.garbage_count = 0
  self.config_file = config_file
  self.load_config

  @client = Datahen::Client::JobPage.new(opts[:client_options])
  nil
end

Instance Attribute Details

#clientDatahen::Client::JobPage (readonly)

Datahen job pages client used for API pages dequeuing.

Returns:



61
62
63
# File 'lib/datahen/scraper/batch_parser.rb', line 61

def client
  @client
end

#configHash (readonly)

Current config file loaded.

Returns:

  • (Hash)

    current loaded configuration



58
59
60
# File 'lib/datahen/scraper/batch_parser.rb', line 58

def config
  @config
end

#config_fileString

Configuration file path.

Returns:

  • (String)

    config file path



14
15
16
# File 'lib/datahen/scraper/batch_parser.rb', line 14

def config_file
  @config_file
end

#dequeue_intervalInteger (readonly)

Dequeue interval in seconds.

Returns:

  • (Integer)

    dequeue interval in seconds



46
47
48
# File 'lib/datahen/scraper/batch_parser.rb', line 46

def dequeue_interval
  @dequeue_interval
end

#dequeue_mutexMutex (readonly)

Dequeuer mutext used to synchronize page dequeuing.

Returns:

  • (Mutex)

    dequeuer mutex



70
71
72
# File 'lib/datahen/scraper/batch_parser.rb', line 70

def dequeue_mutex
  @dequeue_mutex
end

#dequeue_scaleNumeric (readonly)

Dequeue scale used to calculate the ideal dequeue size.

Returns:

  • (Numeric)

    dequeue scale



49
50
51
# File 'lib/datahen/scraper/batch_parser.rb', line 49

def dequeue_scale
  @dequeue_scale
end

#dequeue_timeoutInteger

Dequeue API request timeout in seconds.

Returns:

  • (Integer)

    dequeue API request timeout in seconds



26
27
28
# File 'lib/datahen/scraper/batch_parser.rb', line 26

def dequeue_timeout
  @dequeue_timeout
end

#dequeuer_still_aliveInteger (readonly)

Dequeuer last run unix timestamp.

Returns:

  • (Integer)

    dequeuer last run unix timestamp



73
74
75
# File 'lib/datahen/scraper/batch_parser.rb', line 73

def dequeuer_still_alive
  @dequeuer_still_alive
end

#dequeuer_threadThread (readonly)

Current dequeuer thread.

Returns:

  • (Thread)

    dequeuer thread



67
68
69
# File 'lib/datahen/scraper/batch_parser.rb', line 67

def dequeuer_thread
  @dequeuer_thread
end

#garbage_countInteger

Garbage collector request counter.

Returns:

  • (Integer)

    garbage collector counter



17
18
19
# File 'lib/datahen/scraper/batch_parser.rb', line 17

def garbage_count
  @garbage_count
end

#garbage_mutexMutex (readonly)

Garbage collector mutex used to synchronize garbage collector requests.

Returns:

  • (Mutex)

    garbage collector mutex



64
65
66
# File 'lib/datahen/scraper/batch_parser.rb', line 64

def garbage_mutex
  @garbage_mutex
end

#job_idInteger (readonly)

Job id to be executed.

Returns:

  • (Integer)

    job id



29
30
31
# File 'lib/datahen/scraper/batch_parser.rb', line 29

def job_id
  @job_id
end

#last_messageString

Last printed message, useful to prevent duplicated log messages.

Returns:

  • (String)

    last printed message



20
21
22
# File 'lib/datahen/scraper/batch_parser.rb', line 20

def last_message
  @last_message
end

#loaded_pagesConcurrent::Hash<String, Hash> (readonly)

Loaded pages hash, useful to avoid duplicates on the loaded pages array.

Returns:

  • (Concurrent::Hash<String, Hash>)

    loaded pages as a concurrent hash



38
39
40
# File 'lib/datahen/scraper/batch_parser.rb', line 38

def loaded_pages
  @loaded_pages
end

#max_garbageInteger (readonly)

Max garbage collector requests before actually executing the garbage

collector.

Returns:

  • (Integer)

    max garbage request quantity before actually executing it



43
44
45
# File 'lib/datahen/scraper/batch_parser.rb', line 43

def max_garbage
  @max_garbage
end

#not_foundBoolean (readonly)

Indicates whenever the wait time is because there are no more pages.

Returns:

  • (Boolean)

    ‘true` when wait time is due to no more pages, else `false`



77
78
79
# File 'lib/datahen/scraper/batch_parser.rb', line 77

def not_found
  @not_found
end

#page_typesArray<String> (readonly)

Known page types extracted from the config file.

Returns:

  • (Array<String>)

    known page types



52
53
54
# File 'lib/datahen/scraper/batch_parser.rb', line 52

def page_types
  @page_types
end

#pagesConcurrent::Array<Hash> (readonly)

Loaded pages array.

Returns:

  • (Concurrent::Array<Hash>)

    loaded pages as an array



35
36
37
# File 'lib/datahen/scraper/batch_parser.rb', line 35

def pages
  @pages
end

#parsersConcurrent::Hash<String, String> (readonly)

Known parsers extracted from the config file.

Returns:

  • (Concurrent::Hash<String, String>)

    known parsers



55
56
57
# File 'lib/datahen/scraper/batch_parser.rb', line 55

def parsers
  @parsers
end

#second_dequeue_countInteger

Second dequeue counter used to prevent false negative warning messages.

Returns:

  • (Integer)

    second dequeue counter



23
24
25
# File 'lib/datahen/scraper/batch_parser.rb', line 23

def second_dequeue_count
  @second_dequeue_count
end

#worker_countInteger (readonly)

Parallel worker quantity.

Returns:

  • (Integer)

    parallel worker quantity



32
33
34
# File 'lib/datahen/scraper/batch_parser.rb', line 32

def worker_count
  @worker_count
end

Class Method Details

.timestampInteger

Get a unix timestamp.

Returns:

  • (Integer)

    unix timestamp



87
88
89
# File 'lib/datahen/scraper/batch_parser.rb', line 87

def self.timestamp
  Time.new.utc.to_i
end

.wait(time_in_seconds) ⇒ Object

Wait a specific amount of seconds.

Parameters:

  • time_in_seconds (Integer)

    Seconds to wait.



81
82
83
# File 'lib/datahen/scraper/batch_parser.rb', line 81

def self.wait time_in_seconds
  Kernel.sleep time_in_seconds
end

Instance Method Details

#dequeue_pagesHash

Dequeue one page from the previously loaded pages, and waits until there

are new pages whenever there are no loaded pages.

Returns:

  • (Hash)

    dequeued page



297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
# File 'lib/datahen/scraper/batch_parser.rb', line 297

def dequeue_pages
  # collect garbage
  self.recollect_garbage

  # return page if there are loeaded pages
  is_waiting = false
  while true do
    page = self.pages.shift
    unless page.nil?
      puts "[Worker #{Parallel.worker_number}]: Finish waiting" if is_waiting
      loaded_pages.delete(page['gid'])
      return page
    end

    # be more verbose on worker waiting
    unless is_waiting
      is_waiting = true
      puts "[Worker #{Parallel.worker_number}]: Is waiting for a page..."
      if self.second_dequeue_count > 1 && !self.not_found
        puts "\nWARNING: Your job might not be optimized. Consider increasing your job's \"parser_dequeue_scale\" if the `to_parse` queue is not empty or near empty \n"
      end
    end
    self.class.wait 1

    # ensure the dequeuer thread is alive and healthy
    self.ensure_dequeuer_thread
  end
end

#dequeuer_is_alive!Object

Refresh dequeuer’s still alive timestamp



184
185
186
187
188
189
# File 'lib/datahen/scraper/batch_parser.rb', line 184

def dequeuer_is_alive!
  self.dequeue_mutex.synchronize do
    @dequeuer_still_alive = self.class.timestamp
  end
  nil
end

#ensure_dequeuer_threadBoolean

Ensures that the dequeuer thread exists and is running.

Returns:

  • (Boolean)

    ‘true` if thread was alive, or `false` if had to create a new thread



262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/datahen/scraper/batch_parser.rb', line 262

def ensure_dequeuer_thread
  self.dequeue_mutex.synchronize do
    # check if dequeuer thread is alive and healthy
    if !self.dequeuer_thread.nil? && self.dequeuer_thread.alive?
      still_alive_timeout = (self.dequeue_timeout + self.dequeue_interval) * 2 + self.dequeuer_still_alive
      return true if self.class.timestamp < still_alive_timeout

      # kill dequeuer thread
      self.repeat_puts "Dequeuer isn't healthy, will restart it..."
      self.dequeuer_thread.kill
      @dequeuer_thread = nil
      self.recollect_garbage
      self.no_repeat_puts "Dequeuer thread was killed!"
    end

    # dequeuing on parallel (the ride never ends :D)
    @dequeuer_thread = Thread.new do
      while true
        begin
          self.load_pages
          self.class.wait self.dequeue_interval
        rescue => e
          puts [e.message] + e.backtrace rescue 'error'
        end
      end
      puts "Error: dequeuer died! D:"
    end
    self.repeat_puts "Dequeuer thread was started!"
  end
  false
end

#exec_parse(save = false, keep_outputs = false) ⇒ Object

Dequeue pages and execute the parsers associated to them on parallel.



327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
# File 'lib/datahen/scraper/batch_parser.rb', line 327

def exec_parse save = false, keep_outputs = false
  if self.worker_count < 1
    self.no_repeat_puts NO_WORKERS_MSG
    return
  else
    self.no_repeat_puts "Spawing #{self.worker_count} workers"
  end

  # start dequeuer
  self.ensure_dequeuer_thread

  # process the pages
  dequeue = lambda{ self.dequeue_pages }
  Parallel.each(dequeue, in_threads: (worker_count)) do |page|
    parser_file = self.parsers[page['page_type']]
    begin
      self.repeat_puts("Parsing page with GID #{page['gid']}")
      puts Datahen::Scraper::Parser.exec_parser_by_page(
        parser_file,
        page,
        job_id,
        save,
        nil,
        keep_outputs
      )
      self.repeat_puts("Finish parsing page with GID #{page['gid']}")
    rescue Parallel::Kill => e
      puts "[Worker #{Parallel.worker_number}]: Someone tried to kill Parallel!!!"
    rescue Parallel::Break => e
      puts "[Worker #{Parallel.worker_number}]: Someone tried to break Parallel!!!"
    rescue => e
      puts [e.message] + e.backtrace rescue 'error'
    end
  end

  nil
end

#load_configObject

Loads the config file into a Hash.



151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/datahen/scraper/batch_parser.rb', line 151

def load_config
  # build page type to script file map
  @page_types = []
  @parsers = Concurrent::Hash.new
  @config = YAML.load_file(config_file)
  (self.config['parsers'] || []).each do |v|
    next if !v['disabled'].nil? && !!v['disabled']
    @page_types << v['page_type']
    self.parsers[v['page_type']] = v['file']
  end
  self.recollect_garbage
  nil
end

#load_pagesInteger

Load new pages by dequeuing from the API.

Returns:

  • (Integer)

    amount of pages loaded



193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
# File 'lib/datahen/scraper/batch_parser.rb', line 193

def load_pages
  self.dequeuer_is_alive!

  # calculate dequeue size
  max_dequeue_size = (self.worker_count * self.dequeue_scale).ceil
  current_size = self.pages.length
  dequeue_size = (self.dequeue_scale * (max_dequeue_size - current_size)).ceil
  if dequeue_size < 1
    return 0
  end
  dequeue_size = max_dequeue_size if dequeue_size > max_dequeue_size

  # reserve and get to pages parse
  response = nil
  begin
    response = client.dequeue self.job_id,
      dequeue_size,
      self.page_types,
      config['parse_fetching_failed'],
      timeout: self.dequeue_timeout
  rescue Net::ReadTimeout, Net::OpenTimeout => e
    self.repeat_puts "Dequeue API call timeout! Contact infra team, your job needs a profile change"
    self.dequeuer_is_alive!
    return 0
  rescue => e
    raise e
  end
  self.dequeuer_is_alive!

  # ensure a valid response or try again
  has_empty_response = (response.body.nil? || response.body.empty?)
  if has_empty_response || response.response.code.to_i != 200
    self.repeat_puts(has_empty_response ? 'null' : response.body)
    self.recollect_garbage
    return 0
  end

  # add pages
  count = 0
  json = JSON.parse(response.body)
    if json['error'] != ""
      return 0
    end
  (json['data'] || []).each do |page|
    count += 1
    next if self.loaded_pages.has_key? page['gid']
    self.pages << (self.loaded_pages[page['gid']] = page)
  end
  response = nil
  self.dequeuer_is_alive!

  # recolect garbage to free some memory before parsing
  if count > 0
    @not_found = false
    self.recollect_garbage
    self.repeat_puts "Found #{count} page(s) to parse"
    self.second_dequeue_count += 1 unless self.second_dequeue_count > 1
  else
    @not_found = true
    self.no_repeat_puts NOT_FOUND_MSG
  end

  # return how many pages were loaded
  count
end

#no_repeat_puts(message) ⇒ Object

Print the message only when it is different from the last recorded

message.

Parameters:

  • message (String)

    Message to display.



176
177
178
179
180
181
# File 'lib/datahen/scraper/batch_parser.rb', line 176

def no_repeat_puts message
  return if message == self.last_message
  puts message
  self.last_message = message
  nil
end

#recollect_garbageObject

Execute garbage collector after it is requested as many times as

described by #max_garbage.


138
139
140
141
142
143
144
145
146
147
148
# File 'lib/datahen/scraper/batch_parser.rb', line 138

def recollect_garbage
  self.garbage_mutex.synchronize do
    self.garbage_count += 1
    if self.garbage_count > self.max_garbage
      puts "Recollect garbage"
      GC.start
      self.garbage_count = 0
    end
  end
  nil
end

#repeat_puts(message) ⇒ Object

Print the message regardless of it being the same as the last message.

Parameters:

  • message (String)

    Message to display.



167
168
169
170
171
# File 'lib/datahen/scraper/batch_parser.rb', line 167

def repeat_puts message
  puts message
  self.last_message = message
  nil
end