Class: Cotcube::DataProxy

Inherits:
Object
  • Object
show all
Defined in:
lib/cotcube-dataproxy/init.rb,
lib/cotcube-dataproxy/gc.rb,
lib/cotcube-dataproxy/commserver.rb,
lib/cotcube-dataproxy/3rd_clients.rb,
lib/cotcube-dataproxy/subscribers.rb,
lib/cotcube-dataproxy/client_response.rb

Overview

top-level class documentation comment

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(outputhandler: OutputHandler.new( location: "/var/cotcube/log/dataproxy" )) ⇒ DataProxy

Returns a new instance of DataProxy.



7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# File 'lib/cotcube-dataproxy/init.rb', line 7

def initialize(
  outputhandler: OutputHandler.new(
    location: "/var/cotcube/log/dataproxy"
  )
)
  @output = outputhandler
  @client = DataProxy.get_ib_client
  @mq     = DataProxy.get_mq_client
  @ib     = @client[:ib]
  raise 'Could not connect to IB' unless @ib
  raise 'Could not connect to RabbitMQ' if %i[ request_exch replies_exch request_queue ].map{|z| mq[z].nil? }.reduce(:|)
  @requests = {}
  @req_mon  = Monitor.new
  @persistent = { ticks: {}, depth: {}, realtimebars: {} }
  @per_mon  = Monitor.new
  @gc_thread = nil
  spawn_message_subscribers
  commserver_start
  recover
  gc_start
end

Class Method Details

.get_ib_client(host: 'localhost', port: 4002, id: 5, client_id: 5) ⇒ Object

Create a connection to the locally running



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/cotcube-dataproxy/3rd_clients.rb', line 5

def self.get_ib_client(host: 'localhost', port: 4002, id: 5, client_id: 5)
  obj = {
    id: id,
    client_id: client_id,
    port: port,
    host: host
  }
  begin
    obj[:ib] = IB::Connection.new(
      id: id,
      client_id: client_id,
      port: port,
      host: host
    ) do |provider|
      obj[:alert]            = provider.subscribe(:Alert) { true }
      obj[:managed_accounts] = provider.subscribe(:ManagedAccounts) { true }
    end
    obj[:error] = 0
  rescue Exception => e
    obj[:error] = 1
    obj[:message] = e.message
    obj[:full_message] = e.full_message
  end
  obj
end

.get_mq_client(client_id: 5) ⇒ Object



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/cotcube-dataproxy/3rd_clients.rb', line 32

def self.get_mq_client(client_id: 5)
  obj = {
    client_id: client_id,
  }
  begin
    # for more info on connection parameters see http://rubybunny.info/articles/connecting.html
    #
    obj[:connection]    = Bunny.new(
      host: 'localhost',
      port: 5672,
      user: SECRETS['dataproxy_mq_user'],
      password: SECRETS['dataproxy_mq_password'],
      vhost: SECRETS['dataproxy_mq_vhost']
    )
    obj[:connection].start
    obj[:commands]      = obj[:connection].create_channel
    obj[:channel]       = obj[:connection].create_channel
    obj[:request_queue] = obj[:commands].queue('', exclusive: true, auto_delete: true)
    obj[:request_exch]  = obj[:commands].direct('dataproxy_commands')
    obj[:replies_exch]  = obj[:commands].direct('dataproxy_replies')
    %w[ dataproxy_commands  ].each do |key|
      obj[:request_queue].bind(obj[:request_exch], routing_key: key )
    end
    obj[:error]      = 0
  rescue Exception => e
    obj[:error] = 1
    obj[:message] = e.message
    obj[:full_message] = e.full_message
  end
  obj
end

Instance Method Details

#__int2hex__(id) ⇒ Object



4
5
6
7
8
9
# File 'lib/cotcube-dataproxy/subscribers.rb', line 4

def __int2hex__(id)
  tmp = id.to_s(16) rescue nil
  return nil if tmp.nil?
  tmp.prepend('0') while tmp.length < 7
  return tmp
end

#api(type:, instance: nil) ⇒ Object

Raises:

  • (ArgumentError)


144
145
146
147
148
149
150
151
152
153
154
# File 'lib/cotcube-dataproxy/gc.rb', line 144

def api(type:, instance: nil)
  allowed_types = %i[ queues exchanges bindings ] # other types need different API sepc: channels connections definitions
  raise ArgumentError, "Type must be in '#{allowed_types}', but is  '#{type}'" unless allowed_types.include? type
  req = "#{type.to_s}/#{SECRETS['dataproxy_mq_vhost']}#{instance.nil? ? '' : "/#{instance}"}"
  JSON.parse(HTTParty.get("#{SECRETS['dataproxy_mq_proto']
                       }://#{SECRETS['dataproxy_mq_user']
                         }:#{SECRETS['dataproxy_mq_password']
                         }@#{SECRETS['dataproxy_mq_host']
                         }:#{SECRETS['dataproxy_mq_port']
                     }/api/#{req}").body, symbolize_names: true)
end

#client_fail(request, id: nil, to: nil, exchange: :replies_exch, &block) ⇒ Object



8
9
10
# File 'lib/cotcube-dataproxy/client_response.rb', line 8

def client_fail(request,    id: nil, to: nil, exchange: :replies_exch, &block)
  client_response(request,  id: nil, to: nil, err: 1, exchange: exchange, &block)
end

#client_success(request, id: nil, to: nil, exchange: :replies_exch, &block) ⇒ Object



4
5
6
# File 'lib/cotcube-dataproxy/client_response.rb', line 4

def client_success(request, id: nil, to: nil, exchange: :replies_exch, &block)
  client_response( request, id: nil, to: nil, err: 0, exchange: exchange, &block)
end

#commserver_startObject



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/cotcube-dataproxy/commserver.rb', line 6

def commserver_start
  mq[:request_subscription] = mq[:request_queue].subscribe do |delivery_info, properties, payload|

    ################################################################################################
    # the request will be JSON decoded. The generic command 'failed' will be set, if decoding raises.
    # furthermore, __id__ and __to__ are extracted and added to the request-hash
    #
    request    = JSON.parse(payload, symbolize_names: true) rescue { command: 'failed' }
    request[:command] ||= 'nil'

    request[:__id__] = properties[:correlation_id]
    request[:__to__] = properties[:reply_to]

    if request[:debug] 
      log "Received \t#{delivery_info.map{|k,v| "#{k}\t#{v}"}.join("\n")
                 }\n\n#{properties   .map{|k,v| "#{k}\t#{v}"}.join("\n")
                 }\n\n#{request      .map{|k,v| "#{k}\t#{v}"}.join("\n")}" if request[:debug]
    else
      log "Received\t#{request}"
    end

    ###############################################################################################
    # the entire set of command processing,
    # starting with the (generic) 'failed' command, that just answers with the failure notice
    # and      with another failure notice upon a missing command section in the request
    # ending   with another failure notice, if an unknown command was issued
    #
    log "Processing #{request[:command]}:"
    case request[:command].downcase
    when 'failed'
      client_fail(request) { "Failed to parse payload: '#{payload}'." }

    when 'nil'
      client_fail(request) { "missing :command in request: '#{request}'." }

    ##############################################################################################
    # ping -> pong, just for testing
    # 
    when 'ping'
      client_success(request) { "pong" }

    ##############################################################################################
    # the get_contracts command tries to resolve a list of available contracts related to a
    #     specific symbol based on a set of characteristics retrieved via Herlpers::get_id_set
    #
    # the reply of the message is processed asynchroniously upon reception of
    #     the IB Message 'ContractDataEnd' in the message subscribers section
    # 
    when 'get_contract', 'get_contracts', Cotcube::Helpers.sub(minimum: 3) { 'contracts' }
      if request[:symbol].nil? 
        client_fail(request) { "Cannot requets contracts without :symbol (in '#{request}')." }
        next
      end
      sym = Cotcube::Helpers.get_id_set(symbol: request[:symbol])
      if [nil, false].include? sym
        client_fail(request) { "Unknown symbol '#{request[:symbol]}' in '#{request}'." }
        next
      end
      request[:result] = [] 
      req_mon.synchronize { requests[request[:__id__]] = request }
      ib_contract = IB::Contract.new symbol: sym[:ib_symbol], exchange: sym[:exchange], currency: sym[:currency], sec_type: (request[:sec_type] || 'FUT') 
      ib.send_message :RequestContractData, contract: ib_contract, request_id: request[:__id__].to_i(16)


    ##############################################################################################
    # the historical command retrieves a list of bars as provided by TWS
    # the minimum requirement is :contract parameter issued.
    # 
    # the reply to this message is processed asynchroniously upon reception of
    #     the IB message 'HistoricalData' in message subscribers section
    #
    when Cotcube::Helpers.sub(minimum: 3) {'historical'}
      unless request[:contract].is_a? String and request[:contract].size == 5
        client_fail(request) { "IB needs complete contract information to request data, e.g. ESZ21 instead of ES, got '#{request[:contract]}' in '#{request}'." }
        next
      end
      con_id = request[:con_id] || Cotcube::Helpers.get_ib_contract(request[:contract])[:con_id] rescue nil
      if con_id.nil? or request[:contract].nil? 
         client_fail(request) { "Cannot get :con_id for contract:'#{request[:contract]}' in '#{request}'." } 
         next
      end
      sym    = Cotcube::Helpers.get_id_set(contract: request[:contract])
      before = Time.at(request[:before]).to_ib rescue Time.now.to_ib
      ib_contract = IB::Contract.new(con_id: con_id, exchange: sym[:exchange])
      req = {
        request_id:    request[:__id__].to_i(16),
        symbol:        sym[:symbol],
        sym:           sym,
        contract:      ib_contract,
        end_date_time: before,
        what_to_show:  (request[:based_on]                || :trades),
        use_rth:       (request[:rth_only]                     || 1),
        keep_up_to_date: 0,
        duration:      (request[:duration].gsub('_', ' ') || '1 D'),
        bar_size:      (request[:interval].to_sym         || :min15)
      }
      req_mon.synchronize { requests[request[:__id__]] = request }
      begin
        Timeout.timeout(2) { ib.send_message(IB::Messages::Outgoing::RequestHistoricalData.new(req)) }
      rescue Timeout::Error, IB::Error
        client_fail(request) { 'Could not request historical data. Is ib_client running?' }
        req_mon.synchronize { requests.delete(request[:__id__]) } 
        next
      end


    # ********************************************************************************************
    #
    # REQUESTS BELOW ARE BASED ON A cONTINUOUS IB SUBSCRIPTION AND MUST BE CONSIDERED
    # GARBAGE COLLECTION ( instance.gc ) --- SUBSCRIPTION DATA IS PERSISTET IN @persistent
    #
    # ********************************************************************************************


    ###############################################################################################
    # the start_realtimebars initiates the IBKR realtime (5s) bars delivery for a specific contract
    #     and feeds them into a fanout exchange dedicated to that contract
    #     delivery continues as long as there are queues bound to that exchange
    # 
    when Cotcube::Helpers.sub(minimum:4){'realtimebars'}
      subscribe_persistent(request, type: :realtimebars)
      next

    when 'ticks'
      subscribe_persistent(request, type: :realtimebars)
      next

    when 'depth'
      subscribe_persistent(request, type: :depth)
      next

    else
      client_fail(request) { "Unknown :command '#{request[:command]}' in '#{request}'." }
    end
  end
  log "Started commserver listening on #{mq[:request_queue]}"
end

#commserver_stopObject



144
145
146
147
# File 'lib/cotcube-dataproxy/commserver.rb', line 144

def commserver_stop
  mq[:request_subscription].cancel
  log "Stopped commserver ..."
end

#delete_mq(type:, instance:) ⇒ Object

Raises:

  • (ArgumentError)


125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/cotcube-dataproxy/gc.rb', line 125

def delete_mq(type:, instance: )
  allowed_types = %i[ queues exchanges ]
  raise ArgumentError, "Type must be in '#{allowed_types}', but is  '#{type}'" unless allowed_types.include? type
  result = HTTParty.delete("#{SECRETS['dataproxy_mq_proto']
                        }://#{SECRETS['dataproxy_mq_user']
                          }:#{SECRETS['dataproxy_mq_password']
                          }@#{SECRETS['dataproxy_mq_host']
                          }:#{SECRETS['dataproxy_mq_port']
                      }/api/#{type.to_s
                       }/dp/#{instance}", query: { 'if-unused' => true })
  if result.code == 204
    result = { error: 0}
  else
    result = JSON.parse(result.body, symbolize_names: true)
    result[:error] = 1
  end
  result
end

#gcObject



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# File 'lib/cotcube-dataproxy/gc.rb', line 32

def gc
  get_mq(list: false).each do |item_type, items|
    items.each do |key, item| 
      case item_type
      when :bindings
        # NOTE if might be considerable to unbind unused queues from exchanges before removing either
        #      as the bound value of the exchange will decrease it might be removed on next run of GC
      when :queues
        next if item[:consumers].nil? or item[:consumers].positive?
        delete_mq(type: item_type, instance: item[:name]) 
        log "GC_INFO: Deleted unsed queue: #{item}."
        # sadly we don't know about unsed queues. basically this means that some client did not declare its queue as auto_delete.
        # the dateproxy itself has only 1 queue -- the command queue, everything else is send out to exchanges
      when :exchanges
        if item[:name].empty? or item[:name] =~ /^amq\./ or item[:bound].count.positive? or 
            %w[ dataproxy_commands dataproxy_replies ].include? item[:name]
          next
        end
        #log "GC_INFO: found superfluous exchange '#{item[:name]}'"

        _, subscription_type, contract = item[:name].split('_')
        unless %w[ ticks depth realtimebars ].include? subscription_type.downcase
          puts "GC_WARNING: Unknown subscription_type '#{subscription_type}', skipping..."
          next
        end
        con_id = Cotcube::Helpers.get_ib_contract(contract)[:con_id] rescue 0
        if con_id.zero?
          puts "GC_WARNING: No con_id found for contract '#{contract}', skipping..."
          next
        end
        if persistent[subscription_type.to_sym][con_id].nil?
          puts "GC_WARNING: No record for subscription '#{subscription_type}_#{contract}' with #{con_id} found, deleting anyway..."
        end
        Thread.new do
          per_mon.synchronize { 
            persistent[subscription_type.to_sym][con_id][:on_cancel] = true if persistent.dig(subscription_type.to_sym, con_id)
          } 
          log "GC_INFO: Sending cancel for #{subscription_type}::#{contract}::#{con_id}."
          message_type = case subscription_type;
                         when 'ticks'; :CancelMarketData
                         when 'depth'; :CancelMarketDepth
                         else;         :CancelRealTimeBars
                         end
          if ib.send_message( message_type, id: con_id )
            sleep 0.75 + Random.rand
            res = delete_mq(type: item_type, instance: item[:name])
            log "GC_SUCCESS: exchange '#{item[:name]}' with #{con_id} has been deleted ('#{res}')."
            per_mon.synchronize { persistent[subscription_type.to_sym].delete(con_id) }
          else
            log "GC_FAILED: something went wrong unsubscribing '#{subscription_type}_#{contract}' with #{con_id}."
          end
        end
      else
        log "GC_ERROR: Unexpected type '#{item_type}' in GarbageCollector"
      end
    end
  end
end

#gc_startObject



6
7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/cotcube-dataproxy/gc.rb', line 6

def gc_start
  if gc_thread.nil?
    @gc_thread = Thread.new do 
      loop do
        sleep 30 + 2 * Random.rand
        gc
      end
    end
    log 'GC_INFO: GC spawned.'
  else
    log 'GC_ERROR: Cannot start GC_THREAD more than once.'
  end
end

#gc_stopObject



20
21
22
23
24
25
26
27
28
29
30
# File 'lib/cotcube-dataproxy/gc.rb', line 20

def gc_stop
  if gc_thread.nil?
    log 'GC_ERROR: Cannot stop nonexisting GC_THREAD.'
    false
  else
    gc_thread.kill
    gc_thread = nil
    log 'GC_INFO: GC stopped.'
    true
  end
end

#get_mq(list: true) ⇒ Object



91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/cotcube-dataproxy/gc.rb', line 91

def get_mq(list: true)
  bindings = api(type: :bindings) 
  results  = {} 
  %i[ queues exchanges ].each do |type|
    results[type] = {}  
    items = api type: type
    items.each do |item| 
      query_name = item[:name].empty?  ? 'amq.default' : item[:name]
      results[type][item[:name]] = api type: type, instance: query_name
      results[type][item[:name]][:bound] = bindings.select{|z| z[:source] == item[:name] }.map{|z| z[:destination]} if type == :exchanges
    end
  end
  results.each do |type, items|
    items.each do |key,item|
      if item.is_a? Array
        puts "#{key}\t\t#{item}"
        next
      end
      puts "#{format '%12s', type.to_s.upcase}    #{
      case type
      when :queues 
        "Key: #{format '%-30s', key}  Cons: #{item[:consumers]}"
      when :exchanges
        "Key: #{format '%-30s', key}  Type: #{format '%10s', item[:type]}   Bound: #{item[:bound].presence || 'None.'}"
      else
        "Unknown details for #{type}"
      end
      }" 
    end
  end if list
  results[:bindings] = bindings
  results
end

#shutdownObject



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/cotcube-dataproxy/init.rb', line 29

def shutdown
  puts "Shutting down dataproxy."
  commserver_stop
  gc_stop
  mq[:commands].close
  mq[:channel].close
  mq[:connection].close
  persistent.each do |type, items|
    items.each do |con_id, item|
      log "sending #{ CANCEL_TYPES[type.to_sym]} #{con_id} (for #{item[:contract]})"
      ib.send_message CANCEL_TYPES[type.to_sym], id: con_id
    end
  end
  sleep 1
  gc
  sleep 1
  ib.close
  puts "... done."
end

#spawn_message_subscribersObject



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/cotcube-dataproxy/subscribers.rb', line 11

def spawn_message_subscribers
  @msg_queue = Queue.new
  @depth_queue = Queue.new

  ib.subscribe(:MarketDataType, :TickRequestParameters){|msg| log "#{msg.class}\t#{msg.data.inspect}".colorize(:yellow) }

  @msg_subscriber = ib.subscribe(
    :Alert,
    :ContractData, :ContractDataEnd, :BondContractData,
    :TickGeneric, :TickString, :TickPrice, :TickSize,
    :HistoricalData,
    :RealTimeBar
  ) do |msg|

    @msg_queue << msg
  end

  @depth_subscriber = ib.subscribe( :MarketDepth ) {|msg| @depth_queue << msg}

  @msg_subscriber_thread = Thread.new do
    loop do
      msg = @msg_queue.pop

      data             = msg.data
      data[:time]      = msg.created_at.strftime('%H:%M:%S')
      data[:timestamp] = (msg.created_at.to_time.to_f * 1000).to_i
      __id__           = __int2hex__(data[:request_id])

      case msg

      when IB::Messages::Incoming::HistoricalData
        client_success(requests[__id__]) {
          {
            symbol:   requests[__id__][:contract][..1],
            contract: requests[__id__][:contract],
            base: msg.results.map{|z|
              z.attributes.tap{|z1| z1.delete(:created_at) }
            }
          }
        }
        req_mon.synchronize { requests.delete(__id__) }

      when IB::Messages::Incoming::Alert # Alert
        __id__ = __int2hex__(data[:error_id])
        case data[:code]
        when 162
          log("ALERT 162:".light_red + ' MISSING MARKET DATA PERMISSION')
        when 201
          log("ALERT 201:".light_red + ' DUPLICATE OCA_GROUP')
        else
          log("ALERT #{data[:code]}:".light_red + "        #{data[:message]}")
        end
        data[:msg_type] = 'alert'
        client_fail(requests[__id__]) {data} unless requests[__id__].nil?
        log data

      when IB::Messages::Incoming::ContractData
        req_mon.synchronize do
          requests[__id__][:result] << data[:contract].slice(:local_symbol, :last_trading_day, :con_id) 
        end

      when IB::Messages::Incoming::ContractDataEnd
        sleep 0.25
        client_success(requests[__id__]) { requests[__id__][:result] }
        req_mon.synchronize { requests.delete(__id__) }

      when IB::Messages::Incoming::RealTimeBar
        con_id    = data[:request_id]
        bar       = data[:bar]
        exchange  = persistent[:realtimebars][con_id][:exchange]
        begin
          exchange.publish(bar.slice(*%i[time open high low close volume trades wap]).to_json)
        rescue Bunny::ChannelAlreadyClosed
          ib.send_message :CancelRealTimeBars, id: con_id
          log "Delivery for #{persistent[:realtimebars][con_id][:contract] rescue 'unknown contract' 
              } with con_id #{con_id} has been stopped." 
          Thread.new{ sleep 5; per_mon.synchronize { persistent[:realtimebars].delete(con_id) } }
        end

      when IB::Messages::Incoming::TickSize,
        IB::Messages::Incoming::TickPrice,
        IB::Messages::Incoming::TickGeneric,
        IB::Messages::Incoming::TickString
        con_id    = data[:ticker_id] 
        contract  = persistent[:ticks][con_id][:contract]
        exchange  = persistent[:ticks][con_id][:exchange]
        begin
          exchange.publish(data.inspect.to_json)
        rescue Bunny::ChannelAlreadyClosed
          ib.send_message :CancelMarketData, id: con_id
          log "Delivery for #{persistent[:ticks][con_id][:contract]} with con_id #{con_id} has been stopped."
          Thread.new{ sleep 0.25; per_mon.synchronize { persistent[:ticks].delete(con_id) } }
        end

      else
        log("WARNING".light_red + "\tUnknown messagetype: #{msg.inspect}")
      end
    end
    log "SPAWN_SUBSCRIBERS\tSubscribers attached to IB" if @debug
  end
  @depth_subscriber_thread = Thread.new do
    loop do
      sleep 0.025 while @block_depth_queue
      msg = @depth_queue.pop
      con_id = msg.data[:request_id] 
      msg[:contract] =  persistent[:depth][con_id][:contract]
      persistent[:depth][con_id][:buffer] << msg.data.slice(*%i[ contract position operation side price size ])
    end
  end
end

#subscribe_persistent(request, type:) ⇒ Object



149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# File 'lib/cotcube-dataproxy/commserver.rb', line 149

def subscribe_persistent(request, type:)
  sym    = Cotcube::Helpers.get_id_set(contract: request[:contract])
  con_id = request[:con_id] || Cotcube::Helpers.get_ib_contract(request[:contract])[:con_id] rescue nil
  if sym.nil? or con_id.nil?
    client_fail(request) { "Invalid contract '#{request[:contract]}'." }
    return
  end
  if persistent[type][con_id].nil?
    per_mon.synchronize {
      persistent[type][con_id] = { con_id: con_id,
                                   contract: request[:contract],
                                   exchange: mq[:channel].fanout(request[:exchange]) }
    }
    if type == :depth
      bufferthread = Thread.new do
        sleep 5.0 - (Time.now.to_f % 5)
        loop do
          begin
            @block_depth_queue = true
            sleep 0.025
            con = persistent[:depth][con_id]
            con[:exchange].publish(con[:buffer].to_json)
            con[:buffer] = []
            @block_depth_queue = false
          end
          sleep 5.0 - (Time.now.to_f % 5)
        end
      end
      per_mon.synchronize { persistent[:depth][con_id][:bufferthread] = bufferthread }
    end
    ib_contract = IB::Contract.new(con_id: con_id, exchange: sym[:exchange])
    ib.send_message(REQUEST_TYPES[type], id: con_id, contract: ib_contract, data_type: :trades, use_rth: false)
    client_success(request) { "Delivery of #{type} of #{request[:contract]} started." }
  elsif persistent[type][con_id][:on_cancel]
    client_fail(request) { { reason: :on_cancel, message: "Exchange '#{requst[:exchange]}' is marked for cancel, retry in a few seconds to recreate" } }
  else
    client_success(request) { "Delivery of #{type} of #{request[:contract]} attached to existing." }
  end
end