Class: Peatio::Upstream::Base
- Inherits:
-
Object
- Object
- Peatio::Upstream::Base
- Defined in:
- lib/peatio/upstream/base.rb
Constant Summary collapse
- DEFAULT_DELAY =
1- WEBSOCKET_CONNECTION_RETRY_DELAY =
2
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
- #build_error(response) ⇒ Object
-
#initialize(config) ⇒ Base
constructor
A new instance of Base.
- #mount ⇒ Object
- #notify_public_trade(trade) ⇒ Object
- #on_trade(trade) ⇒ Object
- #subscribe_trades(_market, _ws) ⇒ Object
- #to_s ⇒ Object
- #trade_json(trade) ⇒ Object
- #ws_connect ⇒ Object
- #ws_connect_public ⇒ Object
- #ws_read_message(msg) ⇒ Object
- #ws_read_public_message(msg) ⇒ Object
Constructor Details
#initialize(config) ⇒ Base
Returns a new instance of Base.
11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/peatio/upstream/base.rb', line 11 def initialize(config) @host = config["rest"] @adapter = config[:faraday_adapter] || :em_synchrony @config = config @ws_status = false @market = config['source'] @target = config['target'] @public_trades_cb = [] @logger = Peatio::Logger.logger @peatio_mq = config['amqp'] mount end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
9 10 11 |
# File 'lib/peatio/upstream/base.rb', line 9 def logger @logger end |
Instance Method Details
#build_error(response) ⇒ Object
104 105 106 107 108 |
# File 'lib/peatio/upstream/base.rb', line 104 def build_error(response) JSON.parse(response.body) rescue StandardError => e "Code: #{response.env.status} Message: #{response.env.reason_phrase}" end |
#mount ⇒ Object
24 25 26 |
# File 'lib/peatio/upstream/base.rb', line 24 def mount @public_trades_cb << method(:on_trade) end |
#notify_public_trade(trade) ⇒ Object
96 97 98 |
# File 'lib/peatio/upstream/base.rb', line 96 def notify_public_trade(trade) @public_trades_cb.each {|cb| cb&.call(trade) } end |
#on_trade(trade) ⇒ Object
73 74 75 76 77 78 79 80 81 82 |
# File 'lib/peatio/upstream/base.rb', line 73 def on_trade(trade) logger.info { "Publishing trade event: #{trade.inspect}" } @peatio_mq.enqueue_event("public", @market, "trades", {trades: [trade]}) @peatio_mq.publish :trade, trade_json(trade), { headers: { type: :upstream, market: @market, } } end |
#subscribe_trades(_market, _ws) ⇒ Object
58 59 60 |
# File 'lib/peatio/upstream/base.rb', line 58 def subscribe_trades(_market, _ws) method_not_implemented end |
#to_s ⇒ Object
100 101 102 |
# File 'lib/peatio/upstream/base.rb', line 100 def to_s "Exchange::#{self.class} config: #{@opts}" end |
#trade_json(trade) ⇒ Object
84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/peatio/upstream/base.rb', line 84 def trade_json(trade) trade.deep_symbolize_keys! { id: trade[:tid], price: trade[:price], amount: trade[:amount], market_id: @market, created_at: Time.at(trade[:date]).iso8601, taker_type: trade[:taker_type] } end |
#ws_connect ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/peatio/upstream/base.rb', line 28 def ws_connect logger.info { "Websocket connecting to #{@ws_url}" } raise "websocket url missing for account #{id}" unless @ws_url @ws = Faye::WebSocket::Client.new(@ws_url) @ws.on(:open) do |_e| subscribe_trades(@target, @ws) logger.info { "Websocket connected" } end @ws.on(:message) do |msg| (msg) end @ws.on(:close) do |e| @ws = nil @ws_status = false logger.error "Websocket disconnected: #{e.code} Reason: #{e.reason}" Fiber.new do EM::Synchrony.sleep(WEBSOCKET_CONNECTION_RETRY_DELAY) ws_connect end.resume end end |
#ws_connect_public ⇒ Object
54 55 56 |
# File 'lib/peatio/upstream/base.rb', line 54 def ws_connect_public ws_connect end |
#ws_read_message(msg) ⇒ Object
66 67 68 69 70 71 |
# File 'lib/peatio/upstream/base.rb', line 66 def (msg) logger.debug {"received websocket message: #{msg.data}" } object = JSON.parse(msg.data) (object) end |
#ws_read_public_message(msg) ⇒ Object
62 63 64 |
# File 'lib/peatio/upstream/base.rb', line 62 def (msg) logger.info { "received public message: #{msg}" } end |