Class: Zold::AsyncEntrance
- Inherits:
-
Object
- Object
- Zold::AsyncEntrance
- Defined in:
- lib/zold/node/async_entrance.rb
Overview
The entrance
Instance Method Summary collapse
-
#initialize(entrance, dir, log: Log::NULL, threads: [Concurrent.processor_count, 8].max, queue_limit: 8) ⇒ AsyncEntrance
constructor
A new instance of AsyncEntrance.
-
#push(id, body) ⇒ Object
Always returns an array with a single ID of the pushed wallet.
- #start ⇒ Object
- #to_json ⇒ Object
Constructor Details
#initialize(entrance, dir, log: Log::NULL, threads: [Concurrent.processor_count, 8].max, queue_limit: 8) ⇒ AsyncEntrance
Returns a new instance of AsyncEntrance.
41 42 43 44 45 46 47 48 49 50 |
# File 'lib/zold/node/async_entrance.rb', line 41 def initialize(entrance, dir, log: Log::NULL, threads: [Concurrent.processor_count, 8].max, queue_limit: 8) @entrance = entrance @dir = File.(dir) @log = log @threads = threads @pool = ThreadPool.new('async-entrance', log: log) @queue = Queue.new @queue_limit = queue_limit end |
Instance Method Details
#push(id, body) ⇒ Object
Always returns an array with a single ID of the pushed wallet
90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/zold/node/async_entrance.rb', line 90 def push(id, body) if @queue.size > @queue_limit raise( SoftError, "Queue is too long (#{@queue.size} wallets), can't add #{id}/#{Size.new(body.length)}, try again later" ) end start = Time.now unless exists?(id, body) loop do uuid = SecureRandom.uuid file = File.join(@dir, "#{id}-#{uuid}#{Wallet::EXT}") next if File.exist?(file) IO.write(file, body) @queue << { id: id, file: file } @log.debug("Added #{id}/#{Size.new(body.length)} to the queue at pos.#{@queue.size} \ in #{Age.new(start, limit: 0.05)}") break end end [id] end |
#start ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
# File 'lib/zold/node/async_entrance.rb', line 60 def start raise 'Block must be given to start()' unless block_given? FileUtils.mkdir_p(@dir) DirItems.new(@dir).fetch.each do |f| file = File.join(@dir, f) if /^[0-9a-f]{16}-/.match?(f) id = f.split('-')[0] @queue << { id: Id.new(id), file: file } else File.delete(file) end end @log.info("#{@queue.size} wallets pre-loaded into async_entrace from #{@dir}") unless @queue.size.zero? @entrance.start do (0..@threads).map do |i| @pool.add do Endless.new("async-e##{i}", log: @log).run do take end end end begin yield(self) ensure @pool.kill end end end |
#to_json ⇒ Object
52 53 54 55 56 57 58 |
# File 'lib/zold/node/async_entrance.rb', line 52 def to_json @entrance.to_json.merge( 'queue': @queue.size, 'threads': @pool.count, 'queue_limit': @queue_limit ) end |