Class: Reth::SyncTask

Inherits:
Object
  • Object
show all
Defined in:
lib/reth/sync_task.rb

Overview

Synchronizes the chain starting from a given blockhash. Blockchain hash is fetched from a single peer (which led to the unknown blockhash). Blocks are fetched from the best peers.

Constant Summary collapse

MAX_BLOCKS_PER_REQUEST =
32
INITIAL_BLOCKHASHES_PER_REQUEST =
16
MAX_BLOCKHASHES_PER_REQUEST =
512
BLOCKS_REQUEST_TIMEOUT =
32
BLOCKHASHES_REQUEST_TIMEOUT =
32

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(synchronizer, proto, blockhash, chain_difficulty = 0, originator_only = false) ⇒ SyncTask

Returns a new instance of SyncTask.



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
# File 'lib/reth/sync_task.rb', line 20

def initialize(synchronizer, proto, blockhash, chain_difficulty=0, originator_only=false)
  @synchronizer = synchronizer
  @chain = synchronizer.chain
  @chainservice = synchronizer.chainservice

  @originating_proto = proto
  @originator_only = originator_only

  @blockhash = blockhash
  @chain_difficulty = chain_difficulty

  @requests = {} # proto => [cond, result]
  @start_block_number = @chain.head.number
  @end_block_number = @start_block_number + 1 # minimum synctask

  @run = Thread.new { run }
end

Instance Attribute Details

#start_block_numberObject (readonly)

Returns the value of attribute start_block_number.



18
19
20
# File 'lib/reth/sync_task.rb', line 18

def start_block_number
  @start_block_number
end

Instance Method Details

#fetch_blocks(blockhashes_chain) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# File 'lib/reth/sync_task.rb', line 149

def fetch_blocks(blockhashes_chain)
  raise ArgumentError, 'no blockhashes' if blockhashes_chain.empty?
  logger.debug 'fetching blocks', num: blockhashes_chain.size

  blockhashes_chain.reverse! # oldest to youngest
  num_blocks = blockhashes_chain.size
  num_fetched = 0

  while !blockhashes_chain.empty?
    blockhashes_batch = blockhashes_chain[0, MAX_BLOCKS_PER_REQUEST]
    t_blocks = []

    protos = self.protocols
    if protos.empty?
      logger.warn 'no protocols available'
      return task_exit(false)
    end

    proto = nil
    reply_proto = nil
    protos.each do |_proto|
      proto = _proto

      next if proto.stopped?
      raise AssertError if @requests.has_key?(proto)

      logger.debug 'requesting blocks', num: blockhashes_batch.size
      deferred = Concurrent::IVar.new
      @requests[proto] = deferred

      proto.async.send_getblocks *blockhashes_batch
      begin
        t_blocks = deferred.value(BLOCKS_REQUEST_TIMEOUT)
      rescue Defer::TimedOut
        logger.warn 'getblocks timed out, trying next proto'
        next
      ensure
        @requests.delete proto
      end

      if t_blocks.empty?
        logger.warn 'empty getblocks reply, trying next proto'
        next
      elsif !t_blocks.all? {|b| b.instance_of?(TransientBlock) }
        logger.warn 'received unexpected data', data: t_blocks
        t_blocks = []
        next
      end

      unless t_blocks.map {|b| b.header.full_hash } == blockhashes_batch[0, t_blocks.size]
        logger.warn 'received wrong blocks, should ban peer'
        t_blocks = []
        next
      end

      reply_proto = proto
      break
    end

    # add received t_blocks
    num_fetched += t_blocks.size
    logger.debug "received blocks", num: t_blocks.size, num_fetched: num_fetched, total: num_blocks, missing: (num_blocks - num_fetched)

    if t_blocks.empty?
      logger.warn 'failed to fetch blocks', missing: blockhashes_chain.size
      return task_exit(false)
    end

    t = Time.now
    logger.debug 'adding blocks', qsize: @chainservice.block_queue.size
    t_blocks.each do |blk|
      b = blockhashes_chain.shift
      raise AssertError unless blk.header.full_hash == b
      raise AssertError if blockhashes_chain.include?(blk.header.full_hash)

      @chainservice.add_block blk, reply_proto # this blocks if the queue is full
    end
    logger.debug 'adding blocks done', took: (Time.now - t)
  end

  # done
  last_block = t_blocks.last
  raise AssertError, 'still missing blocks' unless blockhashes_chain.empty?
  raise AssertError, 'still missing blocks' unless last_block.header.full_hash == @blockhash
  logger.debug 'syncing finished'

  # at this time blocks are not in the chain yet, but in the add_block queue
  if @chain_difficulty >= @chain.head.chain_difficulty
    @chainservice.broadcast_newblock last_block, @chain_difficulty, proto
  end

  task_exit(true)
rescue
  logger.error $!
  logger.error $!.backtrace[0,10].join("\n")
  task_exit(false)
end

#fetch_hashchainObject

Raises:

  • (AssertError)


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# File 'lib/reth/sync_task.rb', line 63

def fetch_hashchain
  logger.debug 'fetching hashchain'

  blockhashes_chain = [@blockhash] # youngest to oldest
  blockhash = @blockhash = blockhashes_chain.last
  raise AssertError if @chain.include?(blockhash)

  # get block hashes until we found a known one
  max_blockhashes_per_request = INITIAL_BLOCKHASHES_PER_REQUEST
  chain_head_number = @chain.head.number
  while !@chain.include?(blockhash)
    blockhashes_batch = []

    # proto with highest difficulty should be the proto we got the
    # newblock from
    protos = self.protocols
    if protos.nil? || protos.empty?
      logger.warn 'no protocols available'
      return task_exit(false)
    end

    protos.each do |proto|
      logger.debug "syncing with", proto: proto
      next if proto.stopped?

      raise AssertError if @requests.has_key?(proto)
      deferred = Concurrent::IVar.new
      @requests[proto] = deferred

      proto.async.send_getblockhashes blockhash, max_blockhashes_per_request
      begin
        blockhashes_batch = deferred.value(BLOCKHASHES_REQUEST_TIMEOUT)
      rescue Defer::TimedOut
        logger.warn 'syncing hashchain timed out'
        next
      ensure
        @requests.delete proto
      end

      if blockhashes_batch.empty?
        logger.warn 'empty getblockhashes result'
        next
      end

      unless blockhashes_batch.all? {|bh| bh.instance_of?(String) }
        logger.warn "get wrong data type", expected: 'String', received: blockhashes_batch.map(&:class).uniq
        next
      end

      break
    end

    if blockhashes_batch.empty?
      logger.warn 'syncing failed with all peers', num_protos: protos.size
      return task_exit(false)
    end

    if @chain.include?(blockhashes_batch.last)
      blockhashes_batch.each do |bh| # youngest to oldest
        blockhash = bh

        if @chain.include?(blockhash)
          logger.debug "found known blockhash", blockhash: Utils.encode_hex(blockhash), is_genesis: (blockhash == @chain.genesis.full_hash)
          break
        else
          blockhashes_chain.push(blockhash)
        end
      end
    else # no overlap
      blockhashes_chain.concat blockhashes_batch
      blockhash = blockhashes_batch.last
    end

    logger.debug "downloaded #{blockhashes_chain.size} block hashes, ending with #{Utils.encode_hex(blockhashes_chain.last)}"
    @end_block_number = chain_head_number + blockhashes_chain.size
    max_blockhashes_per_request = MAX_BLOCKHASHES_PER_REQUEST
  end

  @start_block_number = @chain.get(blockhash).number
  @end_block_number = @start_block_number + blockhashes_chain.size

  logger.debug 'computed missing numbers', start_number: @start_block_number, end_number: @end_block_number

  fetch_blocks blockhashes_chain
end

#protocolsObject



58
59
60
61
# File 'lib/reth/sync_task.rb', line 58

def protocols
  return [@originating_proto] if @originator_only
  @synchronizer.protocols
end

#receive_blockhashes(proto, blockhashes) ⇒ Object



256
257
258
259
260
261
262
263
# File 'lib/reth/sync_task.rb', line 256

def receive_blockhashes(proto, blockhashes)
  logger.debug 'blockhashes received', proto: proto, num: blockhashes.size
  unless @requests.has_key?(proto)
    logger.debug 'unexpected blockhashes'
    return
  end
  @requests[proto].set blockhashes
end

#receive_blocks(proto, t_blocks) ⇒ Object



247
248
249
250
251
252
253
254
# File 'lib/reth/sync_task.rb', line 247

def receive_blocks(proto, t_blocks)
  logger.debug 'blocks received', proto: proto, num: t_blocks.size
  unless @requests.has_key?(proto)
    logger.debug 'unexpected blocks'
    return
  end
  @requests[proto].set t_blocks
end

#runObject



38
39
40
41
42
43
44
45
46
# File 'lib/reth/sync_task.rb', line 38

def run
  logger.info 'spawning new synctask'

  fetch_hashchain
rescue
  logger.error $!
  logger.error $!.backtrace[0,20].join("\n")
  task_exit false
end

#task_exit(success = false) ⇒ Object



48
49
50
51
52
53
54
55
56
# File 'lib/reth/sync_task.rb', line 48

def task_exit(success=false)
  if success
    logger.debug 'successfully synced'
  else
    logger.warn 'syncing failed'
  end

  @synchronizer.synctask_exited(success)
end