Class: Reth::SyncTask
- Inherits:
-
Object
- Object
- Reth::SyncTask
- Defined in:
- lib/reth/sync_task.rb
Overview
Synchronizes the chain starting from a given blockhash. Blockchain hash is fetched from a single peer (which led to the unknown blockhash). Blocks are fetched from the best peers.
Constant Summary collapse
- MAX_BLOCKS_PER_REQUEST =
32
- INITIAL_BLOCKHASHES_PER_REQUEST =
16
- MAX_BLOCKHASHES_PER_REQUEST =
512
- BLOCKS_REQUEST_TIMEOUT =
32
- BLOCKHASHES_REQUEST_TIMEOUT =
32
Instance Attribute Summary collapse
-
#start_block_number ⇒ Object
readonly
Returns the value of attribute start_block_number.
Instance Method Summary collapse
- #fetch_blocks(blockhashes_chain) ⇒ Object
- #fetch_hashchain ⇒ Object
-
#initialize(synchronizer, proto, blockhash, chain_difficulty = 0, originator_only = false) ⇒ SyncTask
constructor
A new instance of SyncTask.
- #protocols ⇒ Object
- #receive_blockhashes(proto, blockhashes) ⇒ Object
- #receive_blocks(proto, t_blocks) ⇒ Object
- #run ⇒ Object
- #task_exit(success = false) ⇒ Object
Constructor Details
#initialize(synchronizer, proto, blockhash, chain_difficulty = 0, originator_only = false) ⇒ SyncTask
Returns a new instance of SyncTask.
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
# File 'lib/reth/sync_task.rb', line 20 def initialize(synchronizer, proto, blockhash, chain_difficulty=0, originator_only=false) @synchronizer = synchronizer @chain = synchronizer.chain @chainservice = synchronizer.chainservice @originating_proto = proto @originator_only = originator_only @blockhash = blockhash @chain_difficulty = chain_difficulty @requests = {} # proto => [cond, result] @start_block_number = @chain.head.number @end_block_number = @start_block_number + 1 # minimum synctask @run = Thread.new { run } end |
Instance Attribute Details
#start_block_number ⇒ Object (readonly)
Returns the value of attribute start_block_number.
18 19 20 |
# File 'lib/reth/sync_task.rb', line 18 def start_block_number @start_block_number end |
Instance Method Details
#fetch_blocks(blockhashes_chain) ⇒ Object
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 |
# File 'lib/reth/sync_task.rb', line 149 def fetch_blocks(blockhashes_chain) raise ArgumentError, 'no blockhashes' if blockhashes_chain.empty? logger.debug 'fetching blocks', num: blockhashes_chain.size blockhashes_chain.reverse! # oldest to youngest num_blocks = blockhashes_chain.size num_fetched = 0 while !blockhashes_chain.empty? blockhashes_batch = blockhashes_chain[0, MAX_BLOCKS_PER_REQUEST] t_blocks = [] protos = self.protocols if protos.empty? logger.warn 'no protocols available' return task_exit(false) end proto = nil reply_proto = nil protos.each do |_proto| proto = _proto next if proto.stopped? raise AssertError if @requests.has_key?(proto) logger.debug 'requesting blocks', num: blockhashes_batch.size deferred = Concurrent::IVar.new @requests[proto] = deferred proto.async.send_getblocks *blockhashes_batch begin t_blocks = deferred.value(BLOCKS_REQUEST_TIMEOUT) rescue Defer::TimedOut logger.warn 'getblocks timed out, trying next proto' next ensure @requests.delete proto end if t_blocks.empty? logger.warn 'empty getblocks reply, trying next proto' next elsif !t_blocks.all? {|b| b.instance_of?(TransientBlock) } logger.warn 'received unexpected data', data: t_blocks t_blocks = [] next end unless t_blocks.map {|b| b.header.full_hash } == blockhashes_batch[0, t_blocks.size] logger.warn 'received wrong blocks, should ban peer' t_blocks = [] next end reply_proto = proto break end # add received t_blocks num_fetched += t_blocks.size logger.debug "received blocks", num: t_blocks.size, num_fetched: num_fetched, total: num_blocks, missing: (num_blocks - num_fetched) if t_blocks.empty? logger.warn 'failed to fetch blocks', missing: blockhashes_chain.size return task_exit(false) end t = Time.now logger.debug 'adding blocks', qsize: @chainservice.block_queue.size t_blocks.each do |blk| b = blockhashes_chain.shift raise AssertError unless blk.header.full_hash == b raise AssertError if blockhashes_chain.include?(blk.header.full_hash) @chainservice.add_block blk, reply_proto # this blocks if the queue is full end logger.debug 'adding blocks done', took: (Time.now - t) end # done last_block = t_blocks.last raise AssertError, 'still missing blocks' unless blockhashes_chain.empty? raise AssertError, 'still missing blocks' unless last_block.header.full_hash == @blockhash logger.debug 'syncing finished' # at this time blocks are not in the chain yet, but in the add_block queue if @chain_difficulty >= @chain.head.chain_difficulty @chainservice.broadcast_newblock last_block, @chain_difficulty, proto end task_exit(true) rescue logger.error $! logger.error $!.backtrace[0,10].join("\n") task_exit(false) end |
#fetch_hashchain ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
# File 'lib/reth/sync_task.rb', line 63 def fetch_hashchain logger.debug 'fetching hashchain' blockhashes_chain = [@blockhash] # youngest to oldest blockhash = @blockhash = blockhashes_chain.last raise AssertError if @chain.include?(blockhash) # get block hashes until we found a known one max_blockhashes_per_request = INITIAL_BLOCKHASHES_PER_REQUEST chain_head_number = @chain.head.number while !@chain.include?(blockhash) blockhashes_batch = [] # proto with highest difficulty should be the proto we got the # newblock from protos = self.protocols if protos.nil? || protos.empty? logger.warn 'no protocols available' return task_exit(false) end protos.each do |proto| logger.debug "syncing with", proto: proto next if proto.stopped? raise AssertError if @requests.has_key?(proto) deferred = Concurrent::IVar.new @requests[proto] = deferred proto.async.send_getblockhashes blockhash, max_blockhashes_per_request begin blockhashes_batch = deferred.value(BLOCKHASHES_REQUEST_TIMEOUT) rescue Defer::TimedOut logger.warn 'syncing hashchain timed out' next ensure @requests.delete proto end if blockhashes_batch.empty? logger.warn 'empty getblockhashes result' next end unless blockhashes_batch.all? {|bh| bh.instance_of?(String) } logger.warn "get wrong data type", expected: 'String', received: blockhashes_batch.map(&:class).uniq next end break end if blockhashes_batch.empty? logger.warn 'syncing failed with all peers', num_protos: protos.size return task_exit(false) end if @chain.include?(blockhashes_batch.last) blockhashes_batch.each do |bh| # youngest to oldest blockhash = bh if @chain.include?(blockhash) logger.debug "found known blockhash", blockhash: Utils.encode_hex(blockhash), is_genesis: (blockhash == @chain.genesis.full_hash) break else blockhashes_chain.push(blockhash) end end else # no overlap blockhashes_chain.concat blockhashes_batch blockhash = blockhashes_batch.last end logger.debug "downloaded #{blockhashes_chain.size} block hashes, ending with #{Utils.encode_hex(blockhashes_chain.last)}" @end_block_number = chain_head_number + blockhashes_chain.size max_blockhashes_per_request = MAX_BLOCKHASHES_PER_REQUEST end @start_block_number = @chain.get(blockhash).number @end_block_number = @start_block_number + blockhashes_chain.size logger.debug 'computed missing numbers', start_number: @start_block_number, end_number: @end_block_number fetch_blocks blockhashes_chain end |
#protocols ⇒ Object
58 59 60 61 |
# File 'lib/reth/sync_task.rb', line 58 def protocols return [@originating_proto] if @originator_only @synchronizer.protocols end |
#receive_blockhashes(proto, blockhashes) ⇒ Object
256 257 258 259 260 261 262 263 |
# File 'lib/reth/sync_task.rb', line 256 def receive_blockhashes(proto, blockhashes) logger.debug 'blockhashes received', proto: proto, num: blockhashes.size unless @requests.has_key?(proto) logger.debug 'unexpected blockhashes' return end @requests[proto].set blockhashes end |
#receive_blocks(proto, t_blocks) ⇒ Object
247 248 249 250 251 252 253 254 |
# File 'lib/reth/sync_task.rb', line 247 def receive_blocks(proto, t_blocks) logger.debug 'blocks received', proto: proto, num: t_blocks.size unless @requests.has_key?(proto) logger.debug 'unexpected blocks' return end @requests[proto].set t_blocks end |
#run ⇒ Object
38 39 40 41 42 43 44 45 46 |
# File 'lib/reth/sync_task.rb', line 38 def run logger.info 'spawning new synctask' fetch_hashchain rescue logger.error $! logger.error $!.backtrace[0,20].join("\n") task_exit false end |
#task_exit(success = false) ⇒ Object
48 49 50 51 52 53 54 55 56 |
# File 'lib/reth/sync_task.rb', line 48 def task_exit(success=false) if success logger.debug 'successfully synced' else logger.warn 'syncing failed' end @synchronizer.synctask_exited(success) end |