Class: Britebox::LinesVerifier
- Inherits:
-
Object
- Object
- Britebox::LinesVerifier
- Defined in:
- lib/britebox/lines_verifier.rb
Instance Attribute Summary collapse
-
#fj ⇒ Object
readonly
Returns the value of attribute fj.
Instance Method Summary collapse
-
#initialize(file_job) ⇒ LinesVerifier
constructor
A new instance of LinesVerifier.
- #process!(file_name_to, opts) ⇒ Object
Constructor Details
#initialize(file_job) ⇒ LinesVerifier
Returns a new instance of LinesVerifier.
5 6 7 |
# File 'lib/britebox/lines_verifier.rb', line 5 def initialize(file_job) @fj = file_job end |
Instance Attribute Details
#fj ⇒ Object (readonly)
Returns the value of attribute fj.
3 4 5 |
# File 'lib/britebox/lines_verifier.rb', line 3 def fj @fj end |
Instance Method Details
#process!(file_name_to, opts) ⇒ Object
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/britebox/lines_verifier.rb', line 9 def process!(file_name_to, opts) in_buffer = Queue.new out_buffer = ExportBuffer.new(file_name_to, opts[:header_row], opts[:col_separator]) threads = [] fj.threads_count.times do threads << Thread.new do loop do while fj.paused? || fj.pending? fj.timer.pause sleep(0.5) end break if fj.cancelled? fj.timer.resume item = Timeout.timeout(1) { in_buffer.pop } rescue nil break if item.nil? email = item[:line][opts[:email_index]] if email.to_s.match(FormatRecognizer::EMAIL_PATTERN) if Britebox::Config.simulate # Do not send real requests in this mode sleep(1) contact_status = ['unknown', false, false] else begin contact = fj.brite_client.contacts.create(email: email) contact.verify! contact_status = [contact.status, contact.response[:email]['disposable'], contact.response[:email]['role_address']] rescue contact_status = ['error', false, false] end end else contact_status = ['invalid', false, false] end fj.semaphore.synchronize do out_buffer << {n: item[:n], line: item[:line] + contact_status} fj.size_processed += CSV.generate_line(item[:line], col_sep: opts[:col_separator]).size end end end end idx = 0 CSV.foreach(fj.file_name, col_sep: opts[:col_separator]) do |line| if idx == 0 && FormatRecognizer::is_header_row?(line) fj.size_processed += CSV.generate_line(line, col_sep: opts[:col_separator]).size next end next if line.nil? || line.size == 0 # Throttle file reading break if fj.cancelled? sleep(0.1) while in_buffer.size > fj.buffer_size && !fj.cancelled? in_buffer << {n: idx, line: line} idx += 1 end threads.each{ |m| m.join } out_buffer.flush_backlog out_buffer.close true end |