Class: Bitcoin::Network::Node
- Inherits:
-
Object
- Object
- Bitcoin::Network::Node
- Defined in:
- lib/bitcoin/network/node.rb
Constant Summary collapse
- DEFAULT_CONFIG =
{ :network => :bitcoin, :listen => ["0.0.0.0", Bitcoin.network[:default_port]], :connect => [], :command => ["127.0.0.1", 9999], :storage => "utxo::sqlite://~/.bitcoin-ruby/<network>/blocks.db", :mode => :full, :dns => true, :epoll_limit => 10000, :epoll_user => nil, :addr_file => "~/.bitcoin-ruby/<network>/peers.json", :log => { :network => :info, :storage => :info, }, :max => { :connections_out => 8, :connections_in => 32, :connections => 8, :addr => 256, :queue => 501, :inv => 501, :inv_cache => 0, :unconfirmed => 100, }, :intervals => { :queue => 1, :inv_queue => 1, :addrs => 5, :connect => 5, :relay => 0, }, :import => nil, :skip_validation => false, :check_blocks => 1000, }
Instance Attribute Summary collapse
-
#addrs ⇒ Object
readonly
peer addrs (Array of Bitcoin::Protocol::Addr).
-
#command_connections ⇒ Object
readonly
command connections (Array of CommandHandler).
-
#config ⇒ Object
readonly
configuration hash.
-
#connections ⇒ Object
readonly
connections to other peers (Array of ConnectionHandler).
-
#external_ips ⇒ Object
our external ip addresses we got told by peers.
-
#inv_cache ⇒ Object
readonly
inventory cache (blocks/tx recently downloaded).
-
#inv_queue ⇒ Object
readonly
inventory queue (blocks/tx waiting to be downloaded).
-
#last_block_time ⇒ Object
readonly
time when the last main chain block was added.
-
#log ⇒ Object
readonly
logger.
-
#notifiers ⇒ Object
readonly
clients to be notified for new block/tx events.
-
#queue ⇒ Object
readonly
storage queue (blocks/tx waiting to be stored).
-
#relay_propagation ⇒ Object
Returns the value of attribute relay_propagation.
-
#relay_tx ⇒ Object
Returns the value of attribute relay_tx.
-
#store ⇒ Object
readonly
Bitcoin::Storage backend.
Instance Method Summary collapse
-
#accept_connections? ⇒ Boolean
should the node accept new incoming connections?.
-
#connect_dns ⇒ Object
query addrs from dns seed and connect.
-
#connect_dns_nslookup(seed) {|addrs| ... } ⇒ Object
get peers from dns via nslookup.
-
#connect_dns_resolver(seed) ⇒ Object
get peer addrs from given dns
seed
using em/dns_resolver. - #connect_known_peers ⇒ Object
-
#connect_peer(host, port) ⇒ Object
connect to peer at given
host
/port
. -
#epoll_init ⇒ Object
initiate epoll with given file descriptor and set effective user.
-
#external_ip ⇒ Object
get the external ip that was suggested in version messages from other peers most often.
-
#getblocks(locator = store.get_locator) ⇒ Object
query blocks from random peer.
-
#initialize(config = {}) ⇒ Node
constructor
A new instance of Node.
- #load_addrs ⇒ Object
-
#push_notification(channel, message) ⇒ Object
push notification
message
tochannel
. -
#queue_inv(inv) ⇒ Object
queue inv, caching the most current ones.
- #run ⇒ Object
- #set_store ⇒ Object
- #start_timers ⇒ Object
- #stop ⇒ Object
- #stop_timers ⇒ Object
- #store_addrs ⇒ Object
-
#subscribe(channel) ⇒ Object
subscribe to notification
channel
. - #uptime ⇒ Object
-
#work_addrs ⇒ Object
check if the addr store is full and request new addrs from a random peer if it isn’t.
-
#work_connect ⇒ Object
check if there are enough connections and try to establish new ones if needed.
-
#work_inv_queue ⇒ Object
check for new items in the inv queue and process them, unless the queue is already full.
-
#work_queue ⇒ Object
check for new items in the queue and process them.
- #work_relay ⇒ Object
Constructor Details
#initialize(config = {}) ⇒ Node
Returns a new instance of Node.
88 89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/bitcoin/network/node.rb', line 88 def initialize config = {} @config = DEFAULT_CONFIG.deep_merge(config) @log = Bitcoin::Logger.create(:network, @config[:log][:network]) @connections, @command_connections = [], [] @queue, @queue_thread, @inv_queue, @inv_queue_thread = [], nil, [], nil set_store load_addrs @timers = {} @inv_cache = [] @notifiers = {} @relay_propagation, @last_block_time, @external_ips = {}, Time.now, [] @unconfirmed, @relay_tx = {}, {} end |
Instance Attribute Details
#addrs ⇒ Object (readonly)
peer addrs (Array of Bitcoin::Protocol::Addr)
36 37 38 |
# File 'lib/bitcoin/network/node.rb', line 36 def addrs @addrs end |
#command_connections ⇒ Object (readonly)
command connections (Array of CommandHandler)
21 22 23 |
# File 'lib/bitcoin/network/node.rb', line 21 def command_connections @command_connections end |
#config ⇒ Object (readonly)
configuration hash
12 13 14 |
# File 'lib/bitcoin/network/node.rb', line 12 def config @config end |
#connections ⇒ Object (readonly)
connections to other peers (Array of ConnectionHandler)
18 19 20 |
# File 'lib/bitcoin/network/node.rb', line 18 def connections @connections end |
#external_ips ⇒ Object
our external ip addresses we got told by peers
42 43 44 |
# File 'lib/bitcoin/network/node.rb', line 42 def external_ips @external_ips end |
#inv_cache ⇒ Object (readonly)
inventory cache (blocks/tx recently downloaded)
30 31 32 |
# File 'lib/bitcoin/network/node.rb', line 30 def inv_cache @inv_cache end |
#inv_queue ⇒ Object (readonly)
inventory queue (blocks/tx waiting to be downloaded)
27 28 29 |
# File 'lib/bitcoin/network/node.rb', line 27 def inv_queue @inv_queue end |
#last_block_time ⇒ Object (readonly)
time when the last main chain block was added
45 46 47 |
# File 'lib/bitcoin/network/node.rb', line 45 def last_block_time @last_block_time end |
#log ⇒ Object (readonly)
logger
15 16 17 |
# File 'lib/bitcoin/network/node.rb', line 15 def log @log end |
#notifiers ⇒ Object (readonly)
clients to be notified for new block/tx events
39 40 41 |
# File 'lib/bitcoin/network/node.rb', line 39 def notifiers @notifiers end |
#queue ⇒ Object (readonly)
storage queue (blocks/tx waiting to be stored)
24 25 26 |
# File 'lib/bitcoin/network/node.rb', line 24 def queue @queue end |
#relay_propagation ⇒ Object
Returns the value of attribute relay_propagation.
48 49 50 |
# File 'lib/bitcoin/network/node.rb', line 48 def relay_propagation @relay_propagation end |
#relay_tx ⇒ Object
Returns the value of attribute relay_tx.
47 48 49 |
# File 'lib/bitcoin/network/node.rb', line 47 def relay_tx @relay_tx end |
#store ⇒ Object (readonly)
Bitcoin::Storage backend
33 34 35 |
# File 'lib/bitcoin/network/node.rb', line 33 def store @store end |
Instance Method Details
#accept_connections? ⇒ Boolean
should the node accept new incoming connections?
473 474 475 |
# File 'lib/bitcoin/network/node.rb', line 473 def accept_connections? connections.select(&:incoming?).size >= config[:max][:connections_in] end |
#connect_dns ⇒ Object
query addrs from dns seed and connect
268 269 270 271 272 273 274 275 276 277 278 279 |
# File 'lib/bitcoin/network/node.rb', line 268 def connect_dns unless Bitcoin.network[:dns_seeds].any? log.warn { "No DNS seed nodes available" } return connect_known_peers end connect_dns_resolver(Bitcoin.network[:dns_seeds].sample) do |addrs| log.debug { "DNS returned addrs: #{addrs.inspect}" } addrs.sample(@config[:max][:connections_out] / 2).uniq.each do |addr| connect_peer(addr, Bitcoin.network[:default_port]) end end end |
#connect_dns_nslookup(seed) {|addrs| ... } ⇒ Object
get peers from dns via nslookup
307 308 309 310 311 312 |
# File 'lib/bitcoin/network/node.rb', line 307 def connect_dns_nslookup(seed) log.info { "Querying addresses from DNS seed: #{seed}" } addrs = `nslookup #{seed}`.scan(/Address\: (.+)$/).flatten # exit if @config[:dns] && hosts.size == 0 yield(addrs) end |
#connect_dns_resolver(seed) ⇒ Object
get peer addrs from given dns seed
using em/dns_resolver. fallback to using ‘nslookup` if it is not installed or fails.
290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 |
# File 'lib/bitcoin/network/node.rb', line 290 def connect_dns_resolver(seed) if Bitcoin.require_dependency "em/dns_resolver", gem: "em-dns", exit: false log.info { "Querying addresses from DNS seed: #{seed}" } dns = EM::DnsResolver.resolve(seed) dns.callback {|addrs| yield(addrs) } dns.errback do |*a| log.error { "Cannot resolve DNS seed #{seed}: #{a.inspect}" } connect_dns_nslookup(Bitcoin.network[:dns_seeds].sample) {|a| yield(a) } end else log.info { "Falling back to nslookup resolver." } connect_dns_nslookup(seed) {|a| yield(a) } end end |
#connect_known_peers ⇒ Object
281 282 283 284 285 286 |
# File 'lib/bitcoin/network/node.rb', line 281 def connect_known_peers log.debug { "Attempting to connecting to known nodes" } Bitcoin.network[:known_nodes].shuffle[0..3].each do |node| connect_peer node, Bitcoin.network[:default_port] end end |
#connect_peer(host, port) ⇒ Object
connect to peer at given host
/ port
258 259 260 261 262 263 264 265 |
# File 'lib/bitcoin/network/node.rb', line 258 def connect_peer host, port return if @connections.map{|c| c.host }.include?(host) log.debug { "Attempting to connect to #{host}:#{port}" } EM.connect(host, port.to_i, ConnectionHandler, self, host, port.to_i, :out) rescue log.debug { "Error connecting to #{host}:#{port}" } log.debug { $!.inspect } end |
#epoll_init ⇒ Object
initiate epoll with given file descriptor and set effective user
186 187 188 189 190 191 192 193 194 |
# File 'lib/bitcoin/network/node.rb', line 186 def epoll_init log.info { "EPOLL: Available file descriptors: " + EM.set_descriptor_table_size(@config[:epoll_limit]).to_s } if @config[:epoll_user] EM.set_effective_user(@config[:epoll_user]) log.info { "EPOLL: Effective user set to: #{@config[:epoll_user]}" } end EM.epoll = true end |
#external_ip ⇒ Object
get the external ip that was suggested in version messages from other peers most often.
453 454 455 456 457 |
# File 'lib/bitcoin/network/node.rb', line 453 def external_ip @external_ips.group_by(&:dup).values.max_by(&:size).first rescue @config[:listen][0] end |
#getblocks(locator = store.get_locator) ⇒ Object
query blocks from random peer
335 336 337 338 339 340 341 342 343 344 345 |
# File 'lib/bitcoin/network/node.rb', line 335 def getblocks locator = store.get_locator peer = @connections.select(&:connected?).sample return unless peer log.info { "querying blocks from #{peer.host}:#{peer.port}" } case @config[:mode] when /lite/ peer.send_getheaders locator unless @queue.size >= @config[:max][:queue] when /full|pruned/ peer.send_getblocks locator unless @inv_queue.size >= @config[:max][:inv] end end |
#load_addrs ⇒ Object
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# File 'lib/bitcoin/network/node.rb', line 127 def load_addrs file = @config[:addr_file].sub("~", ENV["HOME"]) .sub("<network>", Bitcoin.network_name.to_s) unless File.exist?(file) @addrs = [] FileUtils.mkdir_p(File.dirname(file)) return end @addrs = JSON.load(File.read(file)).map do |a| addr = Bitcoin::P::Addr.new addr.time, addr.service, addr.ip, addr.port = a['time'], a['service'], a['ip'], a['port'] addr end log.info { "Initialized #{@addrs.size} addrs from #{file}." } rescue @addrs = [] log.warn { "Error loading addrs from #{file}." } end |
#push_notification(channel, message) ⇒ Object
push notification message
to channel
460 461 462 |
# File 'lib/bitcoin/network/node.rb', line 460 def push_notification channel, @notifiers[channel.to_sym].push() if @notifiers[channel.to_sym] end |
#queue_inv(inv) ⇒ Object
queue inv, caching the most current ones
430 431 432 433 434 435 436 437 438 439 440 441 442 |
# File 'lib/bitcoin/network/node.rb', line 430 def queue_inv inv hash = inv[1].unpack("H*")[0] return if @inv_queue.include?(inv) || @queue.select {|i| i[1].hash == hash }.any? return if @store.send("has_#{inv[0]}", hash) # @inv_cache.shift(128) if @inv_cache.size > @config[:max][:inv_cache] # return if @inv_cache.include?([inv[0], inv[1]]) || # @inv_queue.size >= @config[:max][:inv] || # ([email protected]_sync? && inv[0] == :tx) # @inv_cache << [inv[0], inv[1]] @inv_queue << inv end |
#run ⇒ Object
196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
# File 'lib/bitcoin/network/node.rb', line 196 def run @started = Time.now EM.add_shutdown_hook do store_addrs log.info { "Bye" } end # enable kqueue (BSD, OS X) if EM.kqueue? log.info { 'Using BSD kqueue' } EM.kqueue = true end # enable epoll (Linux) if EM.epoll? log.info { 'Using Linux epoll' } epoll_init end EM.run do start_timers host, port = *@config[:command] port ||= Bitcoin.network[:default_port] if host log.debug { "Trying to bind command socket to #{host}:#{port}" } EM.start_server(host, port, CommandHandler, self) log.info { "Command socket listening on #{host}:#{port}" } end host, port = *@config[:listen] port ||= Bitcoin.network[:default_port] if host log.debug { "Trying to bind server socket to #{host}:#{port}" } EM.start_server(host, port.to_i, ConnectionHandler, self, host, port.to_i, :in) log.info { "Server socket listening on #{host}:#{port}" } end @config[:connect].each do |host, port| port ||= Bitcoin.network[:default_port] connect_peer(host, port) log.info { "Connecting to #{host}:#{port}" } end work_connect if @addrs.any? connect_dns if @config[:dns] Signal.trap("INT") do puts "Shutting down. You can force-quit by pressing Ctrl-C again, but it might corrupt your database!" Signal.trap("INT") do puts "Force Quit" exit 1 end self.stop end end end |
#set_store ⇒ Object
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/bitcoin/network/node.rb', line 102 def set_store backend, config = @config[:storage].split('::') @store = Bitcoin::Storage.send(backend, { db: config, mode: @config[:mode], cache_head: true, skip_validation: @config[:skip_validation], log_level: @config[:log][:storage]}, ->(locator) { peer = @connections.select(&:connected?).sample peer.send_getblocks(locator) }) @store.log.level = @config[:log][:storage] @store.check_consistency(@config[:check_blocks]) if @config[:import] @importing = true EM.defer do begin @store.import(@config[:import]); @importing = false rescue log.fatal { $!. } puts *$@ stop end end end end |
#start_timers ⇒ Object
172 173 174 175 176 177 178 179 |
# File 'lib/bitcoin/network/node.rb', line 172 def start_timers return EM.add_timer(1) { start_timers } if @importing [:queue, :inv_queue, :addrs, :connect, :relay].each do |name| interval = @config[:intervals][name].to_f next if !interval || interval == 0.0 @timers[name] = EM.add_periodic_timer(interval, method("work_#{name}")) end end |
#stop ⇒ Object
162 163 164 165 166 |
# File 'lib/bitcoin/network/node.rb', line 162 def stop puts "Shutting down..." stop_timers EM.stop end |
#stop_timers ⇒ Object
181 182 183 |
# File 'lib/bitcoin/network/node.rb', line 181 def stop_timers @timers.each {|n, t| EM.cancel_timer t } end |
#store_addrs ⇒ Object
147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# File 'lib/bitcoin/network/node.rb', line 147 def store_addrs return if !@addrs || !@addrs.any? file = @config[:addr_file].sub("~", ENV["HOME"]) .sub("<network>", Bitcoin.network_name.to_s) FileUtils.mkdir_p(File.dirname(file)) File.open(file, 'w') do |f| addrs = @addrs.map {|a| Hash[[:time, :service, :ip, :port].zip(a.entries)] rescue nil }.compact f.write(JSON.pretty_generate(addrs)) end log.info { "Stored #{@addrs.size} addrs to #{file}." } rescue log.warn { "Error storing addrs to #{file}." } end |
#subscribe(channel) ⇒ Object
subscribe to notification channel
. available channels are: block, tx, output, connection. see CommandHandler for details.
467 468 469 470 |
# File 'lib/bitcoin/network/node.rb', line 467 def subscribe channel @notifiers[channel.to_sym] ||= EM::Channel.new @notifiers[channel.to_sym].subscribe {|*data| yield(*data) } end |
#uptime ⇒ Object
168 169 170 |
# File 'lib/bitcoin/network/node.rb', line 168 def uptime (Time.now - @started).to_i end |
#work_addrs ⇒ Object
check if the addr store is full and request new addrs from a random peer if it isn’t
349 350 351 352 353 354 355 356 357 |
# File 'lib/bitcoin/network/node.rb', line 349 def work_addrs log.debug { "addr worker running" } @addrs.delete_if{|addr| !addr.alive? } if @addrs.size >= @config[:max][:addr] return if !@connections.any? || @config[:max][:connections] <= @connections.size connections = @connections.select(&:connected?) return unless connections.any? log.info { "requesting addrs" } connections.sample.send_getaddr end |
#work_connect ⇒ Object
check if there are enough connections and try to establish new ones if needed
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 |
# File 'lib/bitcoin/network/node.rb', line 316 def work_connect log.debug { "Connect worker running" } desired = @config[:max][:connections_out] - @connections.select(&:outgoing?).size return if desired <= 0 desired = 32 if desired > 32 # connect to max 32 peers at once if addrs.any? addrs.sample(desired) do |addr| Time.now.tv_sec + 10800 - addr.time end.each do |addr| connect_peer(addr.ip, addr.port) end elsif @config[:dns] connect_dns end rescue log.error { "Error during connect: #{$!.inspect}" } end |
#work_inv_queue ⇒ Object
check for new items in the inv queue and process them, unless the queue is already full
418 419 420 421 422 423 424 425 426 427 |
# File 'lib/bitcoin/network/node.rb', line 418 def work_inv_queue @log.debug { "inv queue worker running" } return if @inv_queue.size == 0 return if @queue.size >= @config[:max][:queue] while inv = @inv_queue.shift next if !@store.in_sync? && inv[0] == :tx && @notifiers.empty? next if @queue.map{|i|i[1]}.map(&:hash).include?(inv[1]) inv[2].send("send_getdata_#{inv[0]}", inv[1]) end end |
#work_queue ⇒ Object
check for new items in the queue and process them
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 |
# File 'lib/bitcoin/network/node.rb', line 360 def work_queue @log.debug { "queue worker running" } return getblocks if @queue.size == 0 # switch off utxo cache once there aren't tons of new blocks coming in if @store.in_sync? if @store.is_a?(Bitcoin::Storage::Backends::UtxoStore) && @store.config[:utxo_cache] > 0 log.debug { "switching off utxo cache" } @store.config[:utxo_cache] = 0 end @config[:intervals].each do |name, value| if value <= 1 log.debug { "setting #{name} interval to 5 seconds" } @config[:intervals][name] = 5 end end end while obj = @queue.shift begin if obj[0].to_sym == :block if res = @store.send("new_#{obj[0]}", obj[1]) if res[1] == 0 && obj[1].hash == @store.get_head.hash @last_block_time = Time.now push_notification(:block, [obj[1], res[0]]) obj[1].tx.each {|tx| @unconfirmed.delete(tx.hash) } end getblocks if res[1] == 2 && @store.in_sync? end else drop = @unconfirmed.size - @config[:max][:unconfirmed] + 1 drop.times { @unconfirmed.shift } if drop > 0 unless @unconfirmed[obj[1].hash] @unconfirmed[obj[1].hash] = obj[1] push_notification(:tx, [obj[1], 0]) if @notifiers[:output] obj[1].out.each do |out| address = Bitcoin::Script.new(out.pk_script).get_address push_notification(:output, [obj[1].hash, address, out.value, 0]) end end end end rescue Bitcoin::Validation::ValidationError @log.warn { "ValidationError storing #{obj[0]} #{obj[1].hash}: #{$!.}" } # File.open("./validation_error_#{obj[0]}_#{obj[1].hash}.bin", "w") {|f| # f.write(obj[1].to_payload) } # EM.stop rescue @log.warn { $!.inspect } puts *$@ end end end |
#work_relay ⇒ Object
444 445 446 447 448 449 |
# File 'lib/bitcoin/network/node.rb', line 444 def work_relay log.debug { "relay worker running" } @store.get_unconfirmed_tx.each do |tx| relay_tx(tx) end end |