Class: Datahen::Scraper::Executor Abstract

Inherits:
Object
  • Object
show all
Includes:
Plugin::ContextExposer
Defined in:
lib/datahen/scraper/executor.rb

Overview

This class is abstract.

Constant Summary collapse

MAX_FIND_OUTPUTS_PER_PAGE =

Max allowed page size when query outputs (see #find_outputs).

500
FIND_OUTPUTS_RETRY_LIMIT =
0

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Plugin::ContextExposer

#create_context, #expose_to, #exposed_env, #exposed_methods, exposed_methods, #isolated_binding, #var_or_proc

Instance Attribute Details

#filenameObject

Returns the value of attribute filename.



10
11
12
# File 'lib/datahen/scraper/executor.rb', line 10

def filename
  @filename
end

#gidObject

Returns the value of attribute gid.



10
11
12
# File 'lib/datahen/scraper/executor.rb', line 10

def gid
  @gid
end

#job_idObject

Returns the value of attribute job_id.



10
11
12
# File 'lib/datahen/scraper/executor.rb', line 10

def job_id
  @job_id
end

#pageObject

Returns the value of attribute page.



10
11
12
# File 'lib/datahen/scraper/executor.rb', line 10

def page
  @page
end

Instance Method Details

#clean_backtrace(backtrace) ⇒ Object



349
350
351
352
353
354
355
356
# File 'lib/datahen/scraper/executor.rb', line 349

def clean_backtrace(backtrace)
  i = backtrace.index{|x| x =~ /gems\/datahen/i}
  if i.to_i < 1
    return []
  else
    return backtrace[0..(i-1)]
  end
end

#eval_with_context(file_path, context) ⇒ Object

Note:

Using this method will allow scripts to contain ‘return` to exit the script sooner along some improved security.

Eval a filename with a custom binding

Parameters:

  • file_path (String)

    File path to read.

  • context (Binding)

    Context binding to evaluate with.



389
390
391
# File 'lib/datahen/scraper/executor.rb', line 389

def eval_with_context file_path, context
  eval(File.read(file_path), context, file_path)
end

#exec_parser(save = false) ⇒ Object



14
15
16
# File 'lib/datahen/scraper/executor.rb', line 14

def exec_parser(save=false)
  raise "should be implemented in subclass"
end

#find_output(collection = 'default', query = {}, opts = {}) ⇒ Hash|nil

Note:

*opts ‘:job_id` option is prioritize over `:scraper_name` when both exists. If none add provided or nil values, then current job will be used to query instead, this is the defaul behavior.

Find one output by collection and query with pagination.

Examples:

find_output
find_output 'my_collection'
find_output 'my_collection', {}

Find from another scraper by name

find_output 'my_collection', {}, scraper_name: 'my_scraper'

Find from another scraper by job_id

find_output 'my_collection', {}, job_id: 123

Parameters:

  • collection (String) (defaults to: 'default')

    (‘default’) Collection name.

  • query (Hash) (defaults to: {})

    ({}) Filters to query.

  • opts (Hash) (defaults to: {})

    ({}) Configuration options.

Options Hash (opts):

  • :scraper_name (String|nil) — default: nil

    Scraper name to query from.

  • :job_id (Integer|nil) — default: nil

    Job’s id to query from.

Returns:

  • (Hash|nil)

    ‘Hash` when found, and `nil` when no output is found.

Raises:

  • (ArgumentError)

    collection is not String.

  • (ArgumentError)

    query is not a Hash.



215
216
217
218
# File 'lib/datahen/scraper/executor.rb', line 215

def find_output(collection='default', query={}, opts = {})
  result = find_outputs(collection, query, 1, 1, opts)
  result.respond_to?(:first) ? result.first : nil
end

#find_outputs(collection = 'default', query = {}, page = 1, per_page = 100, opts = {}) ⇒ Array

Note:

*opts ‘:job_id` option is prioritize over `:scraper_name` when both exists. If none add provided or nil values, then current job will be used to query instead, this is the defaul behavior.

Find outputs by collection and query with pagination.

Examples:

find_outputs
find_outputs 'my_collection'
find_outputs 'my_collection', {}
find_outputs 'my_collection', {}, 1
find_outputs 'my_collection', {}, 1, 100

Find from another scraper by name

find_outputs 'my_collection', {}, 1, 100, scraper_name: 'my_scraper'

Find from another scraper by job_id

find_outputs 'my_collection', {}, 1, 100, job_id: 123

Parameters:

  • collection (String) (defaults to: 'default')

    (‘default’) Collection name.

  • query (Hash) (defaults to: {})

    ({}) Filters to query.

  • page (Integer) (defaults to: 1)

    (1) Page number.

  • per_page (Integer) (defaults to: 100)

    (100) Page size.

  • opts (Hash) (defaults to: {})

    ({}) Configuration options.

Options Hash (opts):

  • :scraper_name (String|nil) — default: nil

    Scraper name to query from.

  • :job_id (Integer|nil) — default: nil

    Job’s id to query from.

Returns:

  • (Array)

Raises:

  • (ArgumentError)

    collection is not String.

  • (ArgumentError)

    query is not a Hash.

  • (ArgumentError)

    page is not an Integer greater than 0.

  • (ArgumentError)

    per_page is not an Integer between 1 and 500.



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# File 'lib/datahen/scraper/executor.rb', line 149

def find_outputs(collection='default', query={}, page=1, per_page=100, opts = {})
  # Validate parameters out from nil for easier user usage.
  raise ArgumentError.new("collection needs to be a String") unless collection.is_a?(String)
  raise ArgumentError.new("query needs to be a Hash, instead of: #{query}") unless query.is_a?(Hash)
  unless page.is_a?(Integer) && page > 0
    raise ArgumentError.new("page needs to be an Integer greater than 0")
  end
  unless per_page.is_a?(Integer) && per_page > 0 && per_page <= MAX_FIND_OUTPUTS_PER_PAGE
    raise ArgumentError.new("per_page needs to be an Integer between 1 and #{MAX_FIND_OUTPUTS_PER_PAGE}")
  end

  options = {
    query: query,
    page: page,
    per_page: per_page
  }

  # Get job_id
  query_job_id = opts[:job_id] || get_job_id(opts[:scraper_name], self.job_id)

  # find outputs
  retry_limit = opts.has_key?(:retry_limit) ? opts[:retry_limit] : self.class::FIND_OUTPUTS_RETRY_LIMIT
  client = Client::JobOutput.new(options)
  response = client.all(query_job_id, collection, {
    retry_limit: retry_limit
  })
  if response.code != 200
    raise "response_code: #{response.code}|#{response.parsed_response}"
  end

  # check stream error
  json_data = response.body != 'null' ? response.parsed_response : {}
  if json_data['error'] != ""
    raise "response_code: #{response.code}|Stream error: #{json_data['error']}"
  end
  json_data['data'].nil? ? [] : json_data['data']
end

#finishObject

Finish the executor execution



394
395
396
# File 'lib/datahen/scraper/executor.rb', line 394

def finish
  raise Error::SafeTerminateError
end

#finisher_update(options = {}) ⇒ Object



58
59
60
61
62
63
# File 'lib/datahen/scraper/executor.rb', line 58

def finisher_update(options={})
  client = Client::Job.new()
  job_id = options.fetch(:job_id)

  client.finisher_update(job_id, options)
end

#get_content(job_id, gid) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
# File 'lib/datahen/scraper/executor.rb', line 75

def get_content(job_id, gid)
  client = Client::JobPage.new()
  content_json = client.find_content(job_id, gid)

  if content_json['available']
    signed_url = content_json['signed_url']
    Client::BackblazeContent.new.get_gunzipped_content(signed_url)
  else
    nil
  end
end

#get_failed_content(job_id, gid) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
# File 'lib/datahen/scraper/executor.rb', line 87

def get_failed_content(job_id, gid)
  client = Client::JobPage.new()
  content_json = client.find_failed_content(job_id, gid)

  if content_json['available']
    signed_url = content_json['signed_url']
    Client::BackblazeContent.new.get_gunzipped_content(signed_url)
  else
    nil
  end
end

#get_job_id(scraper_name, default = nil) ⇒ Object

Get current job id from scraper or default when scraper_name is null.

Parameters:

  • scraper_name (String|nil)

    Scraper name.

  • default (Integer|nil) (defaults to: nil)

    (nil) Default job id when no scraper name.

Raises:

  • (Exception)

    When scraper name is not null, and scraper doesn’t exists or it has no current job.



106
107
108
109
110
111
# File 'lib/datahen/scraper/executor.rb', line 106

def get_job_id scraper_name, default = nil
  return default if scraper_name.nil?
  job = Client::ScraperJob.new().find(scraper_name)
  raise JSON.pretty_generate(job) if job['id'].nil?
  job['id']
end

#init_global_pageObject



65
66
67
68
69
70
71
72
73
# File 'lib/datahen/scraper/executor.rb', line 65

def init_global_page()
  client = Client::GlobalPage.new()
  global_page = client.find(gid)
  unless global_page.code == 200
    raise "GID #{gid} not found. Aborting execution!"
  else
    global_page
  end
end

#init_job_pageObject



32
33
34
35
36
37
38
39
40
41
# File 'lib/datahen/scraper/executor.rb', line 32

def init_job_page()
  client = Client::JobPage.new()
  job_page = client.find(job_id, gid)
  unless job_page.code == 200
    raise "Job #{job_id} or GID #{gid} not found. Aborting execution!"
  else
    job_page
  end

end

#init_pageObject



18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/datahen/scraper/executor.rb', line 18

def init_page()
  # skip whenever a page is provided
  return self.page unless self.page.nil?

  if job_id
    puts "getting Job Page"
    init_job_page
  else
    puts "getting Global Page"
    init_global_page()
  end

end

#parsing_update(options = {}) ⇒ Object



43
44
45
46
47
48
49
# File 'lib/datahen/scraper/executor.rb', line 43

def parsing_update(options={})
  client = Client::JobPage.new()
  job_id = options.fetch(:job_id)
  gid = options.fetch(:gid)

  client.parsing_update(job_id, gid, options)
end

#remove_old_dups!(list, key_defaults) ⇒ Integer

Remove dups by prioritizing the latest dup.

Parameters:

  • list (Array)

    List of hashes to dedup.

  • key_defaults (Hash)

    Key and default value pair hash to use on uniq validation.

Returns:

  • (Integer)

    Removed duplicated items count.



227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/datahen/scraper/executor.rb', line 227

def remove_old_dups!(list, key_defaults)
  raw_count = list.count
  keys = key_defaults.keys
  force_uniq = 0
  list.reverse!.uniq! do |item|
    # Extract stringify keys as hash
    key_hash = Hash[item.map{|k,v|keys.include?(k.to_s) ? [k.to_s,v] : nil}.select{|i|!i.nil?}]

    # Apply defaults for uniq validation
    key_defaults.each{|k,v| key_hash[k] = v if key_hash[k].nil?}

    # Don't dedup nil key defaults
    skip_dedup = !keys.find{|k| key_hash[k].nil?}.nil?
    skip_dedup ? (force_uniq += 1) : key_hash
  end
  list.reverse!
  dup_count = raw_count - list.count
  dup_count
end

#remove_old_output_dups!(list) ⇒ Integer

Remove dups by prioritizing the latest dup.

Parameters:

  • list (Array)

    List of outputs to dedup.

Returns:

  • (Integer)

    Removed duplicated items count.



267
268
269
270
271
272
273
# File 'lib/datahen/scraper/executor.rb', line 267

def remove_old_output_dups!(list)
  key_defaults = {
    '_id' => nil,
    '_collection' => 'default'
  }
  remove_old_dups! list, key_defaults
end

#remove_old_page_dups!(list) ⇒ Integer

Note:

It will not dedup for now as it is hard to build gid. TODO: Build gid so we can dedup

Remove page dups by prioritizing the latest dup.

Parameters:

  • list (Array)

    List of pages to dedup.

Returns:

  • (Integer)

    Removed duplicated items count.



255
256
257
258
259
260
# File 'lib/datahen/scraper/executor.rb', line 255

def remove_old_page_dups!(list)
  key_defaults = {
    'gid' => nil
  }
  remove_old_dups! list, key_defaults
end

#save_outputs(outputs = []) ⇒ Object

Note:

IMPORTANT: outputs array’s elements will be removed.

Saves outputs from an array and clear it.

Parameters:

  • outputs (Array) (defaults to: [])

    ([]) Output array to save. Warning: all elements will be removed from the array.



378
379
380
# File 'lib/datahen/scraper/executor.rb', line 378

def save_outputs(outputs=[])
  save_pages_and_outputs([], outputs, save_type)
end

#save_pages(pages = []) ⇒ Object

Note:

IMPORTANT: pages array’s elements will be removed.

Saves pages from an array and clear it.

Parameters:

  • pages (Array) (defaults to: [])

    ([]) Page array to save. Warning: all elements will be removed from the array.



368
369
370
# File 'lib/datahen/scraper/executor.rb', line 368

def save_pages(pages=[])
  save_pages_and_outputs(pages, [], save_type)
end

#save_pages_and_outputs(pages = [], outputs = [], status) ⇒ Object



275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# File 'lib/datahen/scraper/executor.rb', line 275

def save_pages_and_outputs(pages = [], outputs = [], status)
  total_pages = pages.count
  total_outputs = outputs.count
  records_per_slice = 100
  until pages.empty? && outputs.empty?
    pages_slice = pages.shift(records_per_slice)
    pages_dup_count = remove_old_page_dups! pages_slice
    outputs_slice = outputs.shift(records_per_slice)
    outputs_dup_count = remove_old_output_dups! outputs_slice

    log_msgs = []
    unless pages_slice.empty?
      page_dups_ignored = pages_dup_count > 0 ? " (#{pages_dup_count} dups ignored)" : ''
      log_msgs << "#{pages_slice.count} out of #{total_pages} Pages#{page_dups_ignored}"

      unless save
        puts '----------------------------------------'
        puts "Trying to validate #{log_msgs.last}#{page_dups_ignored}"
        puts JSON.pretty_generate pages_slice
      end
    end

    unless outputs_slice.empty?
      output_dups_ignored = outputs_dup_count > 0 ? " (#{outputs_dup_count} dups ignored)" : ''
      log_msgs << "#{outputs_slice.count} out of #{total_outputs} Outputs#{output_dups_ignored}"

      unless save
        puts '----------------------------------------'
        puts "Trying to validate #{log_msgs.last}#{output_dups_ignored}"
        puts JSON.pretty_generate outputs_slice
      end
    end

    # behave differently if it is a real save
    save_status = status
    if save
      log_msg = "Saving #{log_msgs.join(' and ')}."
      puts "#{log_msg}"
    else
      save_status = "#{status}_try"
    end

    # saving to server

    response = update_to_server(
      job_id: job_id,
      gid: gid,
      pages: pages_slice,
      outputs: outputs_slice,
      status: save_status)

    if response.code == 200
      if save
        log_msg = "Saved."
        puts "#{log_msg}"
      else
        puts "Validation successful"
      end
    else
      if save
        puts "Error: Unable to save Pages and/or Outputs to server: #{response.body}"
        raise "Unable to save Pages and/or Outputs to server: #{response.body}"
      else
        puts "Error: Invalid Pages and/or Outputs: #{response.body}"
        raise "Invalid Pages and/or Outputs: #{response.body}"
      end
    end
  end
end

#save_typeObject

Raises:

  • (NotImplementedError)


358
359
360
# File 'lib/datahen/scraper/executor.rb', line 358

def save_type
  raise NotImplementedError.new('Need to implement "save_type" method.')
end

#seeding_update(options = {}) ⇒ Object



51
52
53
54
55
56
# File 'lib/datahen/scraper/executor.rb', line 51

def seeding_update(options={})
  client = Client::Job.new()
  job_id = options.fetch(:job_id)

  client.seeding_update(job_id, options)
end

#update_to_server(opts = {}) ⇒ Object



345
346
347
# File 'lib/datahen/scraper/executor.rb', line 345

def update_to_server(opts = {})
  raise "Implemented in Subclass"
end