Module: Tapyrus::Network::MessageHandler
- Included in:
- Connection
- Defined in:
- lib/tapyrus/network/message_handler.rb
Overview
P2P message handler used by peer connection class.
Instance Method Summary collapse
-
#defer_handle_command(command, payload) ⇒ Object
handle command with EM#defer.
-
#handle(message) ⇒ Object
handle p2p message.
- #handle_command(command, payload) ⇒ Object
- #handshake_done ⇒ Object
- #on_addr(addr) ⇒ Object
- #on_block(block) ⇒ Object
- #on_cmpct_block(cmpct_block) ⇒ Object
- #on_fee_filter(fee_filter) ⇒ Object
- #on_get_addr ⇒ Object
- #on_get_data(get_data) ⇒ Object
- #on_get_headers(headers) ⇒ Object
- #on_headers(headers) ⇒ Object
- #on_inv(inv) ⇒ Object
- #on_mem_pool ⇒ Object
- #on_merkle_block(merkle_block) ⇒ Object
- #on_not_found(not_found) ⇒ Object
- #on_ping(ping) ⇒ Object
- #on_pong(pong) ⇒ Object
- #on_reject(reject) ⇒ Object
- #on_send_cmpct(cmpct) ⇒ Object
- #on_send_headers ⇒ Object
- #on_tx(tx) ⇒ Object
- #on_ver_ack ⇒ Object
- #on_version(version) ⇒ Object
- #parse(message) ⇒ Object
- #parse_header ⇒ Object
- #send_message(msg) ⇒ Object
Instance Method Details
#defer_handle_command(command, payload) ⇒ Object
handle command with EM#defer
47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/tapyrus/network/message_handler.rb', line 47 def defer_handle_command(command, payload) operation = proc { handle_command(command, payload) } callback = proc { |result| } errback = proc do |e| logger.error("error occurred. #{e.}") logger.error(e.backtrace) peer.handle_error(e) end EM.defer(operation, callback, errback) end |
#handle(message) ⇒ Object
handle p2p message.
8 9 10 11 12 13 14 15 16 17 |
# File 'lib/tapyrus/network/message_handler.rb', line 8 def handle() peer.last_recv = Time.now.to_i peer.bytes_recv += .bytesize begin parse() rescue Tapyrus::Message::Error => e logger.error("invalid header magic. #{e.}") close end end |
#handle_command(command, payload) ⇒ Object
#handshake_done ⇒ Object
116 117 118 119 120 121 |
# File 'lib/tapyrus/network/message_handler.rb', line 116 def handshake_done return unless @incomming_handshake && @outgoing_handshake logger.info "handshake finished." @connected = true post_handshake end |
#on_addr(addr) ⇒ Object
142 143 144 145 |
# File 'lib/tapyrus/network/message_handler.rb', line 142 def on_addr(addr) logger.info("receive addr message. #{addr.build_json}") # TODO end |
#on_block(block) ⇒ Object
182 183 184 185 |
# File 'lib/tapyrus/network/message_handler.rb', line 182 def on_block(block) logger.info("receive block message.") # TODO end |
#on_cmpct_block(cmpct_block) ⇒ Object
235 236 237 |
# File 'lib/tapyrus/network/message_handler.rb', line 235 def on_cmpct_block(cmpct_block) logger.info("receive cmpct_block message. #{cmpct_block.build_json}") end |
#on_fee_filter(fee_filter) ⇒ Object
152 153 154 155 |
# File 'lib/tapyrus/network/message_handler.rb', line 152 def on_fee_filter(fee_filter) logger.info("receive feefilter message. #{fee_filter.build_json}") @fee_rate = fee_filter.fee_rate end |
#on_get_addr ⇒ Object
137 138 139 140 |
# File 'lib/tapyrus/network/message_handler.rb', line 137 def on_get_addr logger.info("receive getaddr message.") peer.send_addrs end |
#on_get_data(get_data) ⇒ Object
239 240 241 |
# File 'lib/tapyrus/network/message_handler.rb', line 239 def on_get_data(get_data) logger.info("receive get data message. #{get_data.build_json}") end |
#on_get_headers(headers) ⇒ Object
172 173 174 175 |
# File 'lib/tapyrus/network/message_handler.rb', line 172 def on_get_headers(headers) logger.info("receive getheaders message.") # TODO end |
#on_headers(headers) ⇒ Object
177 178 179 180 |
# File 'lib/tapyrus/network/message_handler.rb', line 177 def on_headers(headers) logger.info("receive headers message.") peer.handle_headers(headers) end |
#on_inv(inv) ⇒ Object
212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 |
# File 'lib/tapyrus/network/message_handler.rb', line 212 def on_inv(inv) logger.info("receive inv message.") blocks = [] txs = [] inv.inventories.each do |i| case i.identifier when Tapyrus::Message::Inventory::MSG_TX txs << i.hash when Tapyrus::Message::Inventory::MSG_BLOCK blocks << i.hash else logger.warn("[#{addr}] peer sent unknown inv type: #{i.identifier}") end end logger.info("receive block= #{blocks.size}, txs: #{txs.size}") peer.handle_block_inv(blocks) unless blocks.empty? end |
#on_mem_pool ⇒ Object
197 198 199 200 |
# File 'lib/tapyrus/network/message_handler.rb', line 197 def on_mem_pool logger.info("receive mempool message.") # TODO return mempool tx end |
#on_merkle_block(merkle_block) ⇒ Object
230 231 232 233 |
# File 'lib/tapyrus/network/message_handler.rb', line 230 def on_merkle_block(merkle_block) logger.info("receive merkle block message. #{merkle_block.build_json}") peer.handle_merkle_block(merkle_block) end |
#on_not_found(not_found) ⇒ Object
192 193 194 195 |
# File 'lib/tapyrus/network/message_handler.rb', line 192 def on_not_found(not_found) logger.info("receive notfound message. #{not_found.build_json}") # TODO end |
#on_ping(ping) ⇒ Object
157 158 159 160 |
# File 'lib/tapyrus/network/message_handler.rb', line 157 def on_ping(ping) logger.info("receive ping message. #{ping.build_json}") (ping.to_response) end |
#on_pong(pong) ⇒ Object
162 163 164 165 166 167 168 169 170 |
# File 'lib/tapyrus/network/message_handler.rb', line 162 def on_pong(pong) logger.info("receive pong message. #{pong.build_json}") if pong.nonce == peer.last_ping_nonce peer.last_ping_nonce = nil peer.last_pong = Time.now.to_i else logger.debug "The remote peer sent the wrong nonce (#{pong.nonce})." end end |
#on_reject(reject) ⇒ Object
202 203 204 205 |
# File 'lib/tapyrus/network/message_handler.rb', line 202 def on_reject(reject) logger.warn("receive reject message. #{reject.build_json}") # TODO end |
#on_send_cmpct(cmpct) ⇒ Object
207 208 209 210 |
# File 'lib/tapyrus/network/message_handler.rb', line 207 def on_send_cmpct(cmpct) logger.info("receive sendcmpct message. #{cmpct.build_json}") # TODO if mode is high and version is 1, relay block with cmpctblock message end |
#on_send_headers ⇒ Object
147 148 149 150 |
# File 'lib/tapyrus/network/message_handler.rb', line 147 def on_send_headers logger.info("receive sendheaders message.") @sendheaders = true end |
#on_tx(tx) ⇒ Object
187 188 189 190 |
# File 'lib/tapyrus/network/message_handler.rb', line 187 def on_tx(tx) logger.info("receive tx message. #{tx.build_json}") peer.handle_tx(tx) end |
#on_ver_ack ⇒ Object
131 132 133 134 135 |
# File 'lib/tapyrus/network/message_handler.rb', line 131 def on_ver_ack logger.info("receive verack message.") @outgoing_handshake = true handshake_done end |
#on_version(version) ⇒ Object
123 124 125 126 127 128 129 |
# File 'lib/tapyrus/network/message_handler.rb', line 123 def on_version(version) logger.info("receive version message. #{version.build_json}") @version = version (Tapyrus::Message::VerAck.new) @incomming_handshake = true handshake_done end |
#parse(message) ⇒ Object
19 20 21 22 23 24 25 26 27 |
# File 'lib/tapyrus/network/message_handler.rb', line 19 def parse() @message += command, payload, rest = parse_header return unless command defer_handle_command(command, payload) @message = "" parse(rest) if rest && rest.bytesize > 0 end |
#parse_header ⇒ Object
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
# File 'lib/tapyrus/network/message_handler.rb', line 29 def parse_header head_magic = Tapyrus.chain_params.magic_head return if @message.nil? || @message.size < MESSAGE_HEADER_SIZE magic, command, length, checksum = @message.unpack("a4A12Va4") raise Tapyrus::Message::Error, "invalid header magic. #{magic.bth}" unless magic.bth == head_magic payload = @message[MESSAGE_HEADER_SIZE...(MESSAGE_HEADER_SIZE + length)] return if payload.size < length unless Tapyrus.double_sha256(payload)[0...4] == checksum raise Tapyrus::Message::Error, "header checksum mismatch. #{checksum.bth}" end rest = @message[(MESSAGE_HEADER_SIZE + length)..-1] [command, payload, rest] end |
#send_message(msg) ⇒ Object
108 109 110 111 112 113 114 |
# File 'lib/tapyrus/network/message_handler.rb', line 108 def (msg) logger.info "send message #{msg.class::COMMAND}" pkt = msg.to_pkt peer.last_send = Time.now.to_i peer.bytes_sent = pkt.bytesize send_data(pkt) end |