Class: Bitfinex::WSv2

Inherits:
Object
  • Object
show all
Includes:
Emittr::Events
Defined in:
lib/ws/ws2.rb

Constant Summary collapse

INFO_SERVER_RESTART =
20051
INFO_MAINTENANCE_START =
20060
INFO_MAINTENANCE_END =
20061
FLAG_DEC_S =

enables all decimals as strings

8,         # enables all decimals as strings
FLAG_TIME_S = 32,       # enables all timestamps as strings
FLAG_TIMESTAMP = 32768, # timestamps in milliseconds
FLAG_SEQ_ALL = 65536,   # enable sequencing
FLAG_CHECKSUM = 131072

Instance Method Summary collapse

Constructor Details

#initialize(params = {}) ⇒ WSv2

enable OB checksums, top 25 levels per side



42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/ws/ws2.rb', line 42

def initialize (params = {})
  @l = Logger.new(STDOUT)
  @l.progname = 'ws2'

  @url = params[:url] || 'wss://api.bitfinex.com/ws/2'
  @api_key = params[:api_key]
  @api_secret = params[:api_secret]
  @manage_obs = params[:manage_order_books]
  @transform = !!params[:transform]
  @seq_audit = !!params[:seq_audit]
  @checksum_audit = !!params[:checksum_audit]

  @enabled_flags = 0
  @is_open = false
  @is_authenticated = false
  @channel_map = {}
  @order_books = {}
  @last_pub_seq = nil
  @last_auth_seq = nil
end

Instance Method Details

#auth!(calc = 0, dms = 0) ⇒ Object



485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
# File 'lib/ws/ws2.rb', line 485

def auth! (calc = 0, dms = 0)
  if @is_authenticated
    raise Exception, 'already authenticated'
  end

  auth_nonce = new_nonce
  auth_payload = "AUTH#{auth_nonce}#{auth_nonce}"
  sig = sign(auth_payload)

  @ws.send(JSON.generate({
    :event => 'auth',
    :apiKey => @api_key,
    :authSig => sig,
    :authPayload => auth_payload,
    :authNonce => auth_nonce,
    :dms => dms,
    :calc => calc
  }))
end

#close!Object



109
110
111
# File 'lib/ws/ws2.rb', line 109

def close!
  @ws.close
end

#enable_flag(flag) ⇒ Object



463
464
465
466
467
468
469
470
# File 'lib/ws/ws2.rb', line 463

def enable_flag (flag)
  return unless @is_open

  @ws.send(JSON.generate({
    :event => 'conf',
    :flags => @enabled_flags | flag
  }))
end

#enable_ob_checksumsObject



481
482
483
# File 'lib/ws/ws2.rb', line 481

def enable_ob_checksums
  enable_flag(FLAG_CHECKSUM)
end

#enable_sequencing(audit = true) ⇒ Object



476
477
478
479
# File 'lib/ws/ws2.rb', line 476

def enable_sequencing (audit = true)
  @seq_audit = audit
  enable_flag(FLAG_SEQ_ALL)
end

#handle_auth_event(msg) ⇒ Object



396
397
398
399
400
401
402
403
404
405
406
407
# File 'lib/ws/ws2.rb', line 396

def handle_auth_event (msg)
  if msg['status'] != 'OK'
    @l.error "auth failed: #{msg['message']}"
    return
  end

  @channel_map[msg['chanId']] = { 'channel' => 'auth' }
  @is_authenticated = true
  emit(:auth, msg)

  @l.info 'authenticated'
end

#handle_auth_message(msg, chan) ⇒ Object



282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
# File 'lib/ws/ws2.rb', line 282

def handle_auth_message (msg, chan)
  type = msg[1]
  return if type == 'hb'
  payload = msg[2]

  case type
  when 'n'
    emit(:notification, @transform ? Models::Notification.new(payload) : payload)
  when 'te'
    emit(:trade_entry, @transform ? Models::Trade.new(payload) : payload)
  when 'tu'
    emit(:trade_update, @transform ? Models::Trade.new(payload) : payload)
  when 'os'
    emit(:order_snapshot, @transform ? payload.map { |o| Models::Order.new(o) } : payload)
  when 'ou'
    emit(:order_update, @transform ? Models::Order.new(payload) : payload)
  when 'on'
    emit(:order_new, @transform ? Models::Order.new(payload) : payload)
  when 'oc'
    emit(:order_close, @transform ? Models::Order.new(payload) : payload)
  when 'ps'
    emit(:position_snapshot, @transform ? payload.map { |p| Models::Position.new(p) } : payload)
  when 'pn'
    emit(:position_new, @transform ? Models::Position.new(payload) : payload)
  when 'pu'
    emit(:position_update, @transform ? Models::Position.new(payload) : payload)
  when 'pc'
    emit(:position_close, @transform ? Models::Position.new(payload) : payload)
  when 'fos'
    emit(:funding_offer_snapshot, @transform ? payload.map { |fo| Models::FundingOffer.new(fo) } : payload)
  when 'fon'
    emit(:funding_offer_new, @transform ? Models::FundingOffer.new(payload) : payload)
  when 'fou'
    emit(:funding_offer_update, @transform ? Models::FundingOffer.new(payload) : payload)
  when 'foc'
    emit(:funding_offer_close, @transform ? Models::FundingOffer.new(payload) : payload)
  when 'fcs'
    emit(:funding_credit_snapshot, @transform ? payload.map { |fc| Models::FundingCredit.new(fc) } : payload)
  when 'fcn'
    emit(:funding_credit_new, @transform ? Models::FundingCredit.new(payload) : payload)
  when 'fcu'
    emit(:funding_credit_update, @transform ? Models::FundingCredit.new(payload) : payload)
  when 'fcc'
    emit(:funding_credit_close, @transform ? Models::FundingCredit.new(payload) : payload)
  when 'fls'
    emit(:funding_loan_snapshot, @transform ? payload.map { |fl| Models::FundingLoan.new(fl) } : payload)
  when 'fln'
    emit(:funding_loan_new, @transform ? Models::FundingLoan.new(payload) : payload)
  when 'flu'
    emit(:funding_loan_update, @transform ? Models::FundingLoan.new(payload) : payload)
  when 'flc'
    emit(:funding_loan_close, @transform ? Models::FundingLoan.new(payload) : payload)
  when 'ws'
    emit(:wallet_snapshot, @transform ? payload.map { |w| Models::Wallet.new(payload) } : payload)
  when 'wu'
    emit(:wallet_update, @transform ? Models::Wallet.new(payload) : payload)
  when 'bu'
    emit(:balance_update, @transform ? Models::BalanceInfo.new(payload) : payload)
  when 'miu'
    emit(:margin_info_update, @transform ? Models::MarginInfo.new(payload) : payload)
  when 'fiu'
    emit(:funding_info_update, @transform ? Models::FundingInfo.new(payload) : payload)
  when 'fte'
    emit(:funding_trade_entry, @transform ? Models::FundingTrade.new(payload) : payload)
  when 'ftu'
    emit(:funding_trade_update, @transform ? Models::FundingTrade.new(payload) : payload)
  end
end

#handle_candles_message(msg, chan) ⇒ Object



231
232
233
234
235
236
237
238
239
# File 'lib/ws/ws2.rb', line 231

def handle_candles_message (msg, chan)
  payload = msg[1]

  if payload[0].kind_of?(Array)
    emit(:candles, chan['key'], @transform ? payload.map { |c| Models::Candle.new(c) } : payload)
  else
    emit(:candles, chan['key'], @transform ? Models::Candle.new(payload) : payload)
  end
end

#handle_config_event(msg) ⇒ Object



442
443
444
445
446
447
448
449
# File 'lib/ws/ws2.rb', line 442

def handle_config_event (msg)
  if msg['status'] != 'OK'
    @l.error "config failed: #{msg['message']}"
  else
    @l.info "flags updated to #{msg['flags']}"
    @enabled_flags = msg['flags']
  end
end

#handle_error_event(msg) ⇒ Object



438
439
440
# File 'lib/ws/ws2.rb', line 438

def handle_error_event (msg)
  @l.error msg
end

#handle_info_event(msg) ⇒ Object



409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
# File 'lib/ws/ws2.rb', line 409

def handle_info_event (msg)
  if msg.include?('version')
    if msg['version'] != 2
      close!
      raise Exception, "server not running API v2: #{msg['version']}"
    end

    platform = msg['platform']

    @l.info "server running API v2 (platform: %s (%d))" % [
      platform['status'] == 0 ? 'under maintenance' : 'operating normally',
      platform['status']
    ]
  elsif msg.include?('code')
    code = msg['code']

    if code == INFO_SERVER_RESTART
      @l.info 'server restarted, please reconnect'
      emit(:server_restart)
    elsif code == INFO_MAINTENANCE_START
      @l.info 'server maintenance period started!'
      emit(:maintenance_start)
    elsif code == INFO_MAINTENANCE_END
      @l.info 'server maintenance period ended!'
      emit(:maintenance_end)
    end
  end
end

#handle_order_book_checksum_message(msg, chan) ⇒ Object



241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# File 'lib/ws/ws2.rb', line 241

def handle_order_book_checksum_message (msg, chan)
  key = "#{chan['symbol']}:#{chan['prec']}:#{chan['len']}"
  emit(:checksum, chan['symbol'], msg)

  return unless @manage_obs
  return unless @order_books.has_key?(key)

  remote_cs = msg[2]
  local_cs = @order_books[key].checksum

  if local_cs != remote_cs
    err = "OB checksum mismatch, have #{local_cs} want #{remote_cs} [#{chan['symbol']}"
    @l.error err
    emit(:error, err)
  else
    @l.info "OB checksum OK #{local_cs} [#{chan['symbol']}]"
  end
end

#handle_order_book_message(msg, chan) ⇒ Object



260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
# File 'lib/ws/ws2.rb', line 260

def handle_order_book_message (msg, chan)
  ob = msg[1]

  if @manage_obs
    key = "#{chan['symbol']}:#{chan['prec']}:#{chan['len']}"

    if !@order_books.has_key?(key)
      @order_books[key] = Models::OrderBook.new(ob, chan['prec'][0] == 'R')
    else
      @order_books[key].update_with(ob)
    end

    data = @order_books[key]
  elsif @transform
    data = Models::OrderBook.new(ob)
  else
    data = ob
  end

  emit(:order_book, chan['symbol'], data)
end

#handle_subscribed_event(msg) ⇒ Object



451
452
453
454
455
# File 'lib/ws/ws2.rb', line 451

def handle_subscribed_event (msg)
  @l.info "subscribed to #{msg['channel']} [#{msg['chanId']}]"
  @channel_map[msg['chanId']] = msg
  emit(:subscribed, msg['chanId'])
end

#handle_ticker_message(msg, chan) ⇒ Object



203
204
205
206
207
208
209
210
211
# File 'lib/ws/ws2.rb', line 203

def handle_ticker_message (msg, chan)
  payload = msg[1]

  if chan['symbol'][0] === 't'
    emit(:ticker, chan['symbol'], @transform ? Models::TradingTicker.new(payload) : payload)
  else
    emit(:ticker, chan['symbol'], @transform ? Models::FundingTicker.new(payload) : payload)
  end
end

#handle_trades_message(msg, chan) ⇒ Object



213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/ws/ws2.rb', line 213

def handle_trades_message (msg, chan)
  if msg[1].kind_of?(Array)
    payload = msg[1]
    emit(:public_trades, chan['symbol'], @transform ? payload.map { |t| Models::PublicTrade.new(t) } : payload)
  else
    payload = @transform ? Models::PublicTrade.new(msg[2]) : msg[2]
    type = msg[1]

    emit(:public_trades, chan['symbol'], payload)

    if type == 'te'
      emit(:public_trade_entry, chan['symbol'], payload)
    elsif type == 'tu'
      emit(:public_trade_update, chan['symbol'], payload)
    end
  end
end

#handle_unsubscribed_event(msg) ⇒ Object



457
458
459
460
461
# File 'lib/ws/ws2.rb', line 457

def handle_unsubscribed_event (msg)
  @l.info "unsubscribed from #{msg['chanId']}"
  @channel_map.delete(msg['chanId'])
  emit(:unsubscribed, msg['chanId'])
end

#is_flag_enabled(flag) ⇒ Object



472
473
474
# File 'lib/ws/ws2.rb', line 472

def is_flag_enabled (flag)
  (@enabled_flags & flag) == flag
end

#new_nonceObject



505
506
507
# File 'lib/ws/ws2.rb', line 505

def new_nonce
  Time.now.to_i.to_s
end

#on_close(e) ⇒ Object



81
82
83
84
85
# File 'lib/ws/ws2.rb', line 81

def on_close (e)
  @l.info 'client closed'
  @is_open = false
  emit(:close)
end

#on_message(e) ⇒ Object



72
73
74
75
76
77
78
79
# File 'lib/ws/ws2.rb', line 72

def on_message (e)
  @l.info "recv #{e.data}"

  msg = JSON.parse(e.data)
  process_message(msg)

  emit(:message, msg)
end

#on_open(e) ⇒ Object



63
64
65
66
67
68
69
70
# File 'lib/ws/ws2.rb', line 63

def on_open (e)
  @l.info 'client open'
  @is_open = true
  emit(:open)

  enable_sequencing if @seq_audit
  enable_ob_checksums if @checksum_audit
end

#open!Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# File 'lib/ws/ws2.rb', line 87

def open!
  if @is_open
    raise Exception, 'already open'
  end

  EM.run {
    @ws = Faye::WebSocket::Client.new(@url)

    @ws.on(:open) do |e|
      on_open(e)
    end

    @ws.on(:message) do |e|
      on_message(e)
    end

    @ws.on(:close) do |e|
      on_close(e)
    end
  }
end

#process_channel_message(msg) ⇒ Object



172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/ws/ws2.rb', line 172

def process_channel_message (msg)
  if !@channel_map.include?(msg[0])
    @l.error "recv message on unknown channel: #{msg[0]}"
    return
  end

  chan = @channel_map[msg[0]]
  type = msg[1]

  if msg.size < 2 || type == 'hb'
    return
  end

  case chan['channel']
  when 'ticker'
    handle_ticker_message(msg, chan)
  when 'trades'
    handle_trades_message(msg, chan)
  when 'candles'
    handle_candles_message(msg, chan)
  when 'book'
    if type == 'cs'
      handle_order_book_checksum_message(msg, chan)
    else
      handle_order_book_message(msg, chan)
    end
  when 'auth'
    handle_auth_message(msg, chan)
  end
end

#process_event_message(msg) ⇒ Object



379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
# File 'lib/ws/ws2.rb', line 379

def process_event_message (msg)
  case msg['event']
  when 'auth'
    handle_auth_event(msg)
  when 'subscribed'
    handle_subscribed_event(msg)
  when 'unsubscribed'
    handle_unsubscribed_event(msg)
  when 'info'
    handle_info_event(msg)
  when 'conf'
    handle_config_event(msg)
  when 'error'
    handle_error_event(msg)
  end
end

#process_message(msg) ⇒ Object



113
114
115
116
117
118
119
120
121
122
123
# File 'lib/ws/ws2.rb', line 113

def process_message (msg)
  if @seq_audit
    validate_message_seq(msg)
  end

  if msg.kind_of?(Array)
    process_channel_message(msg)
  elsif msg.kind_of?(Hash)
    process_event_message(msg)
  end
end

#sign(payload) ⇒ Object



509
510
511
# File 'lib/ws/ws2.rb', line 509

def sign (payload)
  OpenSSL::HMAC.hexdigest('sha384', @api_secret, payload)
end

#submit_order(order) ⇒ Object



513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
# File 'lib/ws/ws2.rb', line 513

def submit_order (order)
  return if !@is_authenticated

  if order.kind_of?(Array)
    packet = order
  elsif order.instance_of?(Models::Order)
    packet = order.to_new_order_packet
  elsif order.kind_of?(Hash)
    packet = Models::Order.new(order).to_new_order_packet
  else
    raise Exception, 'tried to submit order of unkown type'
  end

  @ws.send(JSON.generate([0, 'on', nil, packet]))
end

#subscribe(channel, params = {}) ⇒ Object



351
352
353
354
355
356
357
# File 'lib/ws/ws2.rb', line 351

def subscribe (channel, params = {})
  @l.info 'subscribing to channel %s [%s]' % [channel, params]
  @ws.send(JSON.generate(params.merge({
    :event => 'subscribe',
    :channel => channel,
  })))
end

#subscribe_candles(key) ⇒ Object



367
368
369
# File 'lib/ws/ws2.rb', line 367

def subscribe_candles (key)
  subscribe('candles', { :key => key })
end

#subscribe_order_book(sym, prec, len) ⇒ Object



371
372
373
374
375
376
377
# File 'lib/ws/ws2.rb', line 371

def subscribe_order_book (sym, prec, len)
  subscribe('book', {
    :symbol => sym,
    :prec => prec,
    :len => len
  })
end

#subscribe_ticker(sym) ⇒ Object



359
360
361
# File 'lib/ws/ws2.rb', line 359

def subscribe_ticker (sym)
  subscribe('ticker', { :symbol => sym })
end

#subscribe_trades(sym) ⇒ Object



363
364
365
# File 'lib/ws/ws2.rb', line 363

def subscribe_trades (sym)
  subscribe('trades', { :symbol => sym })
end

#validate_message_seq(msg) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# File 'lib/ws/ws2.rb', line 125

def validate_message_seq (msg)
  return unless @seq_audit
  return unless msg.kind_of?(Array)
  return unless msg.size > 2

  # The auth sequence # is the last value in channel 0 non-hb packets
  if msg[0] == 0 && msg[1] != 'hb'
    auth_seq = msg[-1]
  else
    auth_seq = nil
  end

  # all other packets provide a public sequence # as the last value. For
  # chan 0 packets, these are included as the 2nd to last value
  #
  # note that error notifications lack seq
  if msg[0] == 0 && msg[1] != 'hb' && !(msg[1] && msg[2][6] == 'ERROR')
    pub_seq = msg[-2]
  else
    pub_seq = msg[-1]
  end

  return unless pub_seq.is_a?(Numeric)

  if @last_pub_seq.nil?
    @last_pub_seq = pub_seq
    return
  end

  if pub_seq != (@last_pub_seq + 1) # check pub seq
    @l.warn "invalid pub seq #; last #{@last_pub_seq}, got #{pub_seq}"
  end

  @last_pub_seq = pub_seq

  return unless auth_seq.is_a?(Numeric)
  return if auth_seq == 0
  return if msg[1] == 'n' && msg[2][6] == 'ERROR' # error notifications
  return if auth_seq == @last_auth_seq # seq didn't advance

  if !@last_auth_seq.nil? && auth_seq != @last_auth_seq + 1
    @l.warn "invalid auth seq #; last #{@last_auth_seq}, got #{auth_seq}"
  end

  @last_auth_seq = auth_seq
end