Module: Tapyrus::Network::MessageHandler

Included in:
Connection
Defined in:
lib/tapyrus/network/message_handler.rb

Overview

P2P message handler used by peer connection class.

Instance Method Summary collapse

Instance Method Details

#defer_handle_command(command, payload) ⇒ Object

handle command with EM#defer



47
48
49
50
51
52
53
54
55
56
57
# File 'lib/tapyrus/network/message_handler.rb', line 47

def defer_handle_command(command, payload)
  operation = proc { handle_command(command, payload) }
  callback = proc { |result| }
  errback =
    proc do |e|
      logger.error("error occurred. #{e.message}")
      logger.error(e.backtrace)
      peer.handle_error(e)
    end
  EM.defer(operation, callback, errback)
end

#handle(message) ⇒ Object

handle p2p message.



8
9
10
11
12
13
14
15
16
17
# File 'lib/tapyrus/network/message_handler.rb', line 8

def handle(message)
  peer.last_recv = Time.now.to_i
  peer.bytes_recv += message.bytesize
  begin
    parse(message)
  rescue Tapyrus::Message::Error => e
    logger.error("invalid header magic. #{e.message}")
    close
  end
end

#handle_command(command, payload) ⇒ Object



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/tapyrus/network/message_handler.rb', line 59

def handle_command(command, payload)
  logger.info("[#{addr}] process command #{command}.")
  case command
  when Tapyrus::Message::Version::COMMAND
    on_version(Tapyrus::Message::Version.parse_from_payload(payload))
  when Tapyrus::Message::VerAck::COMMAND
    on_ver_ack
  when Tapyrus::Message::GetAddr::COMMAND
    on_get_addr
  when Tapyrus::Message::Addr::COMMAND
    on_addr(Tapyrus::Message::Addr.parse_from_payload(payload))
  when Tapyrus::Message::SendHeaders::COMMAND
    on_send_headers
  when Tapyrus::Message::FeeFilter::COMMAND
    on_fee_filter(Tapyrus::Message::FeeFilter.parse_from_payload(payload))
  when Tapyrus::Message::Ping::COMMAND
    on_ping(Tapyrus::Message::Ping.parse_from_payload(payload))
  when Tapyrus::Message::Pong::COMMAND
    on_pong(Tapyrus::Message::Pong.parse_from_payload(payload))
  when Tapyrus::Message::GetHeaders::COMMAND
    on_get_headers(Tapyrus::Message::GetHeaders.parse_from_payload(payload))
  when Tapyrus::Message::Headers::COMMAND
    on_headers(Tapyrus::Message::Headers.parse_from_payload(payload))
  when Tapyrus::Message::Block::COMMAND
    on_block(Tapyrus::Message::Block.parse_from_payload(payload))
  when Tapyrus::Message::Tx::COMMAND
    on_tx(Tapyrus::Message::Tx.parse_from_payload(payload))
  when Tapyrus::Message::NotFound::COMMAND
    on_not_found(Tapyrus::Message::NotFound.parse_from_payload(payload))
  when Tapyrus::Message::MemPool::COMMAND
    on_mem_pool
  when Tapyrus::Message::Reject::COMMAND
    on_reject(Tapyrus::Message::Reject.parse_from_payload(payload))
  when Tapyrus::Message::SendCmpct::COMMAND
    on_send_cmpct(Tapyrus::Message::SendCmpct.parse_from_payload(payload))
  when Tapyrus::Message::Inv::COMMAND
    on_inv(Tapyrus::Message::Inv.parse_from_payload(payload))
  when Tapyrus::Message::MerkleBlock::COMMAND
    on_merkle_block(Tapyrus::Message::MerkleBlock.parse_from_payload(payload))
  when Tapyrus::Message::CmpctBlock::COMMAND
    on_cmpct_block(Tapyrus::Message::CmpctBlock.parse_from_payload(payload))
  when Tapyrus::Message::GetData::COMMAND
    on_get_data(Tapyrus::Message::GetData.parse_from_payload(payload))
  else
    logger.warn("unsupported command received. command: #{command}, payload: #{payload.bth}")
    close("with command #{command}")
  end
end

#handshake_doneObject



116
117
118
119
120
121
# File 'lib/tapyrus/network/message_handler.rb', line 116

def handshake_done
  return unless @incomming_handshake && @outgoing_handshake
  logger.info "handshake finished."
  @connected = true
  post_handshake
end

#on_addr(addr) ⇒ Object



142
143
144
145
# File 'lib/tapyrus/network/message_handler.rb', line 142

def on_addr(addr)
  logger.info("receive addr message. #{addr.build_json}")
  # TODO
end

#on_block(block) ⇒ Object



182
183
184
185
# File 'lib/tapyrus/network/message_handler.rb', line 182

def on_block(block)
  logger.info("receive block message.")
  # TODO
end

#on_cmpct_block(cmpct_block) ⇒ Object



235
236
237
# File 'lib/tapyrus/network/message_handler.rb', line 235

def on_cmpct_block(cmpct_block)
  logger.info("receive cmpct_block message. #{cmpct_block.build_json}")
end

#on_fee_filter(fee_filter) ⇒ Object



152
153
154
155
# File 'lib/tapyrus/network/message_handler.rb', line 152

def on_fee_filter(fee_filter)
  logger.info("receive feefilter message. #{fee_filter.build_json}")
  @fee_rate = fee_filter.fee_rate
end

#on_get_addrObject



137
138
139
140
# File 'lib/tapyrus/network/message_handler.rb', line 137

def on_get_addr
  logger.info("receive getaddr message.")
  peer.send_addrs
end

#on_get_data(get_data) ⇒ Object



239
240
241
# File 'lib/tapyrus/network/message_handler.rb', line 239

def on_get_data(get_data)
  logger.info("receive get data message. #{get_data.build_json}")
end

#on_get_headers(headers) ⇒ Object



172
173
174
175
# File 'lib/tapyrus/network/message_handler.rb', line 172

def on_get_headers(headers)
  logger.info("receive getheaders message.")
  # TODO
end

#on_headers(headers) ⇒ Object



177
178
179
180
# File 'lib/tapyrus/network/message_handler.rb', line 177

def on_headers(headers)
  logger.info("receive headers message.")
  peer.handle_headers(headers)
end

#on_inv(inv) ⇒ Object



212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
# File 'lib/tapyrus/network/message_handler.rb', line 212

def on_inv(inv)
  logger.info("receive inv message.")
  blocks = []
  txs = []
  inv.inventories.each do |i|
    case i.identifier
    when Tapyrus::Message::Inventory::MSG_TX
      txs << i.hash
    when Tapyrus::Message::Inventory::MSG_BLOCK
      blocks << i.hash
    else
      logger.warn("[#{addr}] peer sent unknown inv type: #{i.identifier}")
    end
  end
  logger.info("receive block= #{blocks.size}, txs: #{txs.size}")
  peer.handle_block_inv(blocks) unless blocks.empty?
end

#on_mem_poolObject



197
198
199
200
# File 'lib/tapyrus/network/message_handler.rb', line 197

def on_mem_pool
  logger.info("receive mempool message.")
  # TODO return mempool tx
end

#on_merkle_block(merkle_block) ⇒ Object



230
231
232
233
# File 'lib/tapyrus/network/message_handler.rb', line 230

def on_merkle_block(merkle_block)
  logger.info("receive merkle block message. #{merkle_block.build_json}")
  peer.handle_merkle_block(merkle_block)
end

#on_not_found(not_found) ⇒ Object



192
193
194
195
# File 'lib/tapyrus/network/message_handler.rb', line 192

def on_not_found(not_found)
  logger.info("receive notfound message. #{not_found.build_json}")
  # TODO
end

#on_ping(ping) ⇒ Object



157
158
159
160
# File 'lib/tapyrus/network/message_handler.rb', line 157

def on_ping(ping)
  logger.info("receive ping message. #{ping.build_json}")
  send_message(ping.to_response)
end

#on_pong(pong) ⇒ Object



162
163
164
165
166
167
168
169
170
# File 'lib/tapyrus/network/message_handler.rb', line 162

def on_pong(pong)
  logger.info("receive pong message. #{pong.build_json}")
  if pong.nonce == peer.last_ping_nonce
    peer.last_ping_nonce = nil
    peer.last_pong = Time.now.to_i
  else
    logger.debug "The remote peer sent the wrong nonce (#{pong.nonce})."
  end
end

#on_reject(reject) ⇒ Object



202
203
204
205
# File 'lib/tapyrus/network/message_handler.rb', line 202

def on_reject(reject)
  logger.warn("receive reject message. #{reject.build_json}")
  # TODO
end

#on_send_cmpct(cmpct) ⇒ Object



207
208
209
210
# File 'lib/tapyrus/network/message_handler.rb', line 207

def on_send_cmpct(cmpct)
  logger.info("receive sendcmpct message. #{cmpct.build_json}")
  # TODO if mode is high and version is 1, relay block with cmpctblock message
end

#on_send_headersObject



147
148
149
150
# File 'lib/tapyrus/network/message_handler.rb', line 147

def on_send_headers
  logger.info("receive sendheaders message.")
  @sendheaders = true
end

#on_tx(tx) ⇒ Object



187
188
189
190
# File 'lib/tapyrus/network/message_handler.rb', line 187

def on_tx(tx)
  logger.info("receive tx message. #{tx.build_json}")
  peer.handle_tx(tx)
end

#on_ver_ackObject



131
132
133
134
135
# File 'lib/tapyrus/network/message_handler.rb', line 131

def on_ver_ack
  logger.info("receive verack message.")
  @outgoing_handshake = true
  handshake_done
end

#on_version(version) ⇒ Object



123
124
125
126
127
128
129
# File 'lib/tapyrus/network/message_handler.rb', line 123

def on_version(version)
  logger.info("receive version message. #{version.build_json}")
  @version = version
  send_message(Tapyrus::Message::VerAck.new)
  @incomming_handshake = true
  handshake_done
end

#parse(message) ⇒ Object



19
20
21
22
23
24
25
26
27
# File 'lib/tapyrus/network/message_handler.rb', line 19

def parse(message)
  @message += message
  command, payload, rest = parse_header
  return unless command

  defer_handle_command(command, payload)
  @message = ""
  parse(rest) if rest && rest.bytesize > 0
end

#parse_headerObject



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# File 'lib/tapyrus/network/message_handler.rb', line 29

def parse_header
  head_magic = Tapyrus.chain_params.magic_head
  return if @message.nil? || @message.size < MESSAGE_HEADER_SIZE

  magic, command, length, checksum = @message.unpack("a4A12Va4")
  raise Tapyrus::Message::Error, "invalid header magic. #{magic.bth}" unless magic.bth == head_magic

  payload = @message[MESSAGE_HEADER_SIZE...(MESSAGE_HEADER_SIZE + length)]
  return if payload.size < length
  unless Tapyrus.double_sha256(payload)[0...4] == checksum
    raise Tapyrus::Message::Error, "header checksum mismatch. #{checksum.bth}"
  end

  rest = @message[(MESSAGE_HEADER_SIZE + length)..-1]
  [command, payload, rest]
end

#send_message(msg) ⇒ Object



108
109
110
111
112
113
114
# File 'lib/tapyrus/network/message_handler.rb', line 108

def send_message(msg)
  logger.info "send message #{msg.class::COMMAND}"
  pkt = msg.to_pkt
  peer.last_send = Time.now.to_i
  peer.bytes_sent = pkt.bytesize
  send_data(pkt)
end