Class: Bitflyer::Realtime::WebSocketClient

Inherits:
Object
  • Object
show all
Defined in:
lib/bitflyer/realtime/websocket.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(host:, debug: false) ⇒ WebSocketClient

Returns a new instance of WebSocketClient.



10
11
12
13
14
15
16
17
# File 'lib/bitflyer/realtime/websocket.rb', line 10

def initialize(host:, debug: false)
  @host = host
  @debug = debug
  @error = nil
  @channel_names = []
  @channel_callbacks = {}
  connect
end

Instance Attribute Details

#channel_callbacksObject

Returns the value of attribute channel_callbacks.



7
8
9
# File 'lib/bitflyer/realtime/websocket.rb', line 7

def channel_callbacks
  @channel_callbacks
end

#channel_nameObject

Returns the value of attribute channel_name.



7
8
9
# File 'lib/bitflyer/realtime/websocket.rb', line 7

def channel_name
  @channel_name
end

#errorObject

Returns the value of attribute error.



7
8
9
# File 'lib/bitflyer/realtime/websocket.rb', line 7

def error
  @error
end

#last_ping_atObject

Returns the value of attribute last_ping_at.



7
8
9
# File 'lib/bitflyer/realtime/websocket.rb', line 7

def last_ping_at
  @last_ping_at
end

#last_pong_atObject

Returns the value of attribute last_pong_at.



7
8
9
# File 'lib/bitflyer/realtime/websocket.rb', line 7

def last_pong_at
  @last_pong_at
end

#ping_intervalObject

Returns the value of attribute ping_interval.



7
8
9
# File 'lib/bitflyer/realtime/websocket.rb', line 7

def ping_interval
  @ping_interval
end

#ping_timeoutObject

Returns the value of attribute ping_timeout.



7
8
9
# File 'lib/bitflyer/realtime/websocket.rb', line 7

def ping_timeout
  @ping_timeout
end

#websocket_clientObject

Returns the value of attribute websocket_client.



7
8
9
# File 'lib/bitflyer/realtime/websocket.rb', line 7

def websocket_client
  @websocket_client
end

Instance Method Details

#connectObject



26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/bitflyer/realtime/websocket.rb', line 26

def connect
  @websocket_client = WebSocket::Client::Simple.connect "#{@host}/socket.io/?transport=websocket"
  this = self

  Thread.new do
    loop do
      sleep 1
      if @websocket_client && @websocket_client.open?
        send_ping
        wait_pong
      end
    end
  end

  Thread.new do
    loop do
      sleep 1
      next unless @error
      reconnect
    end
  end

  @websocket_client.on(:message) { |payload| this.handle_message(payload: payload) }
  @websocket_client.on(:error) { |error| this.handle_error(error: error) }
end

#debug_log(message) ⇒ Object



131
132
133
134
# File 'lib/bitflyer/realtime/websocket.rb', line 131

def debug_log(message)
  return unless @debug
  p message
end

#disconnectObject



120
121
122
123
# File 'lib/bitflyer/realtime/websocket.rb', line 120

def disconnect
  debug_log 'Disconnecting from server...'
  @error = true
end

#emit_message(json:) ⇒ Object



125
126
127
128
129
# File 'lib/bitflyer/realtime/websocket.rb', line 125

def emit_message(json:)
  channel_name, *messages = JSON.parse json
  return unless channel_name
  messages.each { |message| @channel_callbacks[channel_name.to_sym]&.call(message) }
end

#handle_error(error:) ⇒ Object



82
83
84
85
86
# File 'lib/bitflyer/realtime/websocket.rb', line 82

def handle_error(error:)
  debug_log error
  return unless error.kind_of? Errno::ECONNRESET
  reconnect
end

#handle_message(payload:) ⇒ Object



88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# File 'lib/bitflyer/realtime/websocket.rb', line 88

def handle_message(payload:)
  debug_log payload.data
  return unless payload.data =~ /^\d+/
  code, body = payload.data.scan(/^(\d+)(.*)$/)[0]

  case code.to_i
  when 0 then setup_by_response(json: body)
  when 3 then receive_pong
  when 41 then disconnect
  when 42 then emit_message(json: body)
  end
rescue => e
  puts e
  puts e.backtrace.join("\n")
end

#receive_pongObject



115
116
117
118
# File 'lib/bitflyer/realtime/websocket.rb', line 115

def receive_pong
  debug_log 'Received pong'
  @last_pong_at = Time.now.to_i
end

#reconnectObject



69
70
71
72
73
74
75
76
77
78
79
80
# File 'lib/bitflyer/realtime/websocket.rb', line 69

def reconnect
  return unless @error
  debug_log 'Reconnecting...'

  @error = nil
  @websocket_client.close if @websocket_client.open?
  connect
  @channel_names.each do |channel_name|
    debug_log "42#{{ subscribe: channel_name }.to_json}"
    websocket_client.send "42#{['subscribe', channel_name].to_json}"
  end
end

#send_pingObject



52
53
54
55
56
57
58
59
# File 'lib/bitflyer/realtime/websocket.rb', line 52

def send_ping
  return unless @last_ping_at && @ping_interval
  return unless Time.now.to_i - @last_ping_at > @ping_interval / 1000

  debug_log 'Sent ping'
  @websocket_client.send "2"
  @last_ping_at = Time.now.to_i
end

#setup_by_response(json:) ⇒ Object



104
105
106
107
108
109
110
111
112
113
# File 'lib/bitflyer/realtime/websocket.rb', line 104

def setup_by_response(json:)
  body = JSON.parse json
  @ping_interval = body["pingInterval"].to_i || 25000
  @ping_timeout  = body["pingTimeout"].to_i || 60000
  @last_ping_at = Time.now.to_i
  @last_pong_at = Time.now.to_i
  channel_callbacks.each do |channel_name, _|
    websocket_client.send "42#{['subscribe', channel_name].to_json}"
  end
end

#subscribe(channel_name:, &block) ⇒ Object



19
20
21
22
23
24
# File 'lib/bitflyer/realtime/websocket.rb', line 19

def subscribe(channel_name:, &block)
  debug_log "Subscribe #{channel_name}"
  @channel_names = (@channel_names + [channel_name]).uniq
  @channel_callbacks[channel_name] = block
  websocket_client.send "42#{['subscribe', channel_name].to_json}"
end

#wait_pongObject



61
62
63
64
65
66
67
# File 'lib/bitflyer/realtime/websocket.rb', line 61

def wait_pong
  return unless @last_pong_at && @ping_timeout
  return unless Time.now.to_i - @last_pong_at > @ping_timeout / 1000

  debug_log 'Timed out waiting pong'
  @websocket_client.close
end