Class: Cotcube::Helpers::DataClient
- Inherits:
-
Object
- Object
- Cotcube::Helpers::DataClient
- Defined in:
- lib/cotcube-helpers/orderclient.rb,
lib/cotcube-helpers/data_client.rb
Overview
A proxyclient is a wrapper that allows communication with cotcube-orderproxy and cotcube-dataproxy. It fulfills registration and provides the opportunity to implement the logic to respond do events. (orderproxy and dataproxy are separate gems creating a layer between tws/ib-ruby and cotcube-based
applications)
NOTE: Whats here is a provisionally version
Constant Summary collapse
- SECRETS_DEFAULT =
{ 'dataproxy_mq_proto' => 'http', 'dataproxy_mq_user' => 'guest', 'dataproxy_mq_password' => 'guest', 'dataproxy_mq_host' => 'localhost', 'dataproxy_mq_port' => '15672', 'dataproxy_mq_vhost' => '%2F' }.freeze
- SECRETS =
SECRETS_DEFAULT.merge( lambda { begin YAML.safe_load(File.read(Cotcube::Helpers.init[:secrets_file])) rescue StandardError {} end }.call )
Instance Attribute Summary collapse
-
#account ⇒ Object
readonly
rubocop:disable Metrics/ClassLength.
-
#average ⇒ Object
readonly
rubocop:disable Metrics/ClassLength.
-
#condition ⇒ Object
readonly
Returns the value of attribute condition.
-
#lock ⇒ Object
readonly
Returns the value of attribute lock.
-
#multiplier ⇒ Object
readonly
rubocop:disable Metrics/ClassLength.
-
#power ⇒ Object
readonly
rubocop:disable Metrics/ClassLength.
-
#response ⇒ Object
Returns the value of attribute response.
-
#ticksize ⇒ Object
readonly
rubocop:disable Metrics/ClassLength.
Instance Method Summary collapse
-
#command(command, timeout: 10) ⇒ Object
(also: #send_command)
command acts a synchronizer: it sends the command and waits for the response otherwise times out — the counterpart here is the subscription within setup_reply_queue.
- #exit_on_startup(msg = '') ⇒ Object
- #get_contracts(symbol:) ⇒ Object
- #get_historical(contract:, interval:, duration: nil, before: nil, rth_only: false, based_on: :trades) ⇒ Object
-
#initialize(debug: false, contract:, serverport: 24001, serveraddr: '127.0.0.1', client:, bars: true, ticks: false, bar_size: 5, spawn_timeout: 15) ⇒ DataClient
constructor
The constructor takes a lot of arguments:.
-
#shutdown(close: true) ⇒ Object
#shutdown ends the @server_thread and –if :close is set– closes the current position attached to the client.
- #spawn_server(execution_proc: nil, orderstate_proc: nil, tick_proc: nil, depth_proc: nil, order_proc: nil, bars_proc: nil) ⇒ Object
- #start_persistent(contract:, type: :realtimebars, local_id: 0, &block) ⇒ Object
- #stop ⇒ Object
- #stop_persistent(contract:, local_id: 0, type: :realtimebars) ⇒ Object
Constructor Details
#initialize(debug: false, contract:, serverport: 24001, serveraddr: '127.0.0.1', client:, bars: true, ticks: false, bar_size: 5, spawn_timeout: 15) ⇒ DataClient
The constructor takes a lot of arguments:
18 19 20 21 22 23 24 25 26 27 28 29 30 |
# File 'lib/cotcube-helpers/orderclient.rb', line 18 def initialize @connection = Bunny.new(user: SECRETS['dataproxy_mq_user'], password: SECRETS['dataproxy_mq_password'], vhost: SECRETS['dataproxy_mq_vhost']) @connection.start @commands = connection.create_channel @exchange = commands.direct('dataproxy_commands') @requests = {} @persistent = { depth: {}, realtimebars: {}, ticks: {} } @debug = false setup_reply_queue end |
Instance Attribute Details
#account ⇒ Object (readonly)
rubocop:disable Metrics/ClassLength
15 16 17 |
# File 'lib/cotcube-helpers/orderclient.rb', line 15 def account @account end |
#average ⇒ Object (readonly)
rubocop:disable Metrics/ClassLength
15 16 17 |
# File 'lib/cotcube-helpers/orderclient.rb', line 15 def average @average end |
#condition ⇒ Object (readonly)
Returns the value of attribute condition.
178 179 180 |
# File 'lib/cotcube-helpers/data_client.rb', line 178 def condition @condition end |
#lock ⇒ Object (readonly)
Returns the value of attribute lock.
178 179 180 |
# File 'lib/cotcube-helpers/data_client.rb', line 178 def lock @lock end |
#multiplier ⇒ Object (readonly)
rubocop:disable Metrics/ClassLength
15 16 17 |
# File 'lib/cotcube-helpers/orderclient.rb', line 15 def multiplier @multiplier end |
#power ⇒ Object (readonly)
rubocop:disable Metrics/ClassLength
15 16 17 |
# File 'lib/cotcube-helpers/orderclient.rb', line 15 def power @power end |
#response ⇒ Object
Returns the value of attribute response.
177 178 179 |
# File 'lib/cotcube-helpers/data_client.rb', line 177 def response @response end |
#ticksize ⇒ Object (readonly)
rubocop:disable Metrics/ClassLength
15 16 17 |
# File 'lib/cotcube-helpers/orderclient.rb', line 15 def ticksize @ticksize end |
Instance Method Details
#command(command, timeout: 10) ⇒ Object Also known as: send_command
command acts a synchronizer: it sends the command and waits for the response
otherwise times out --- the counterpart here is the subscription within
setup_reply_queue
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
# File 'lib/cotcube-helpers/data_client.rb', line 47 def command(command, timeout: 10) command = { command: command.to_s } unless command.is_a? Hash command[:timestamp] ||= (Time.now.to_f * 1000).to_i request_id = Digest::SHA256.hexdigest(command.to_json)[..6] requests[request_id] = { request: command, id: request_id, lock: Mutex.new, condition: ConditionVariable.new } exchange.publish(command.to_json, routing_key: 'dataproxy_commands', correlation_id: request_id, reply_to: reply_queue.name) # wait for the signal to continue the execution # requests[request_id][:lock].synchronize do requests[request_id][:condition].wait(requests[request_id][:lock], timeout) end # if we reached timeout, we will return nil, just for explicity response = requests[request_id][:response].dup requests.delete(request_id) response end |
#exit_on_startup(msg = '') ⇒ Object
62 63 64 65 66 |
# File 'lib/cotcube-helpers/orderclient.rb', line 62 def exit_on_startup(msg = '') puts "Cannot startup client, exiting during startup: '#{msg}'" shutdown defined?(::IRB) ? (raise) : (exit 1) end |
#get_contracts(symbol:) ⇒ Object
88 89 90 |
# File 'lib/cotcube-helpers/data_client.rb', line 88 def get_contracts(symbol:) send_command({ command: :get_contracts, symbol: symbol }) end |
#get_historical(contract:, interval:, duration: nil, before: nil, rth_only: false, based_on: :trades) ⇒ Object
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 |
# File 'lib/cotcube-helpers/data_client.rb', line 92 def get_historical(contract:, interval:, duration: nil, before: nil, rth_only: false, based_on: :trades) # rth.true? means data outside of rth is skipped rth_only = rth_only ? 1 : 0 # interval most probably is given as ActiveSupport::Duration if interval.is_a? ActiveSupport::Duration interval = case interval when 1; :sec1 when 5; :sec5 when 15; :sec15 when 30; :sec30 when 60; :min1 when 120; :min2 when 300; :min5 when 900; :min15 when 1800; :min30 when 3600; :hour1 when 86400; :day1 else; interval end end default_durations = { sec1: '30_M', sec5: '2_H', sec15: '6_H', sec30: '12_H', min1: '1_D', min2: '2_D', min5: '5_D', min15: '1_W', min30: '1_W', hour1: '1_W', day1: '1_Y' } unless default_durations.keys.include? interval raise "Invalid interval '#{interval}', should be in '#{default_durations.keys}'." end # TODO: Check for valid duration specification puts 'WARNING in get_historical: param :before ignored' unless before.nil? duration ||= default_durations[interval] send_command({ command: :historical, contract: contract, interval: interval, duration: duration, based_on: based_on.to_s.upcase, rth_only: rth_only, before: nil }, timeout: 20) end |
#shutdown(close: true) ⇒ Object
#shutdown ends the @server_thread and –if :close is set– closes the current position attached to the client
102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 |
# File 'lib/cotcube-helpers/orderclient.rb', line 102 def shutdown(close: true) return if @shutdown @shutdown = true if @position.abs.positive? && close send_command({ command: 'order', action: 'create', type: 'market', side: (@position.positive? ? 'sell' : 'buy'), size: @position.abs }) end sleep 3 result = send_command({ command: 'unregister' }) puts "FINAL ACCOUNT: #{@account}" result['executions']&.each do |x| x.delete('msg_type') puts x.to_s end @server_thread.exit if @server_thread.respond_to? :exit end |
#spawn_server(execution_proc: nil, orderstate_proc: nil, tick_proc: nil, depth_proc: nil, order_proc: nil, bars_proc: nil) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
# File 'lib/cotcube-helpers/orderclient.rb', line 120 def spawn_server( execution_proc: nil, orderstate_proc: nil, tick_proc: nil, depth_proc: nil, order_proc: nil, bars_proc: nil ) # rubocop:disable Metrics/MethodLength %w[execution_proc orderstate_proc tick_proc depth_proc order_proc bars_proc].each {|var| eval("@#{var} = #{var}") } if @bars @bars_proc ||= lambda {|msg| puts msg.inspect } end if @ticks @ticks_proc ||= lambda {|msg| puts msg.inspect } end if @shutdown puts "Cannot spawn server on proxyclient that has been already shut down." return end if @server_thread puts "Cannot spawn server more than once." return end @server_thread = Thread.new do # rubocop:disable Metrics/BlockLength puts 'Spawning RECEIVER' server = TCPServer.open(@serveraddr, @client) loop do # rubocop:disable Metrics/BlockLength Thread.start(server.accept) do |client| # rubocop:disable Metrics/BlockLength while (response = client.gets) response = JSON.parse(response) case response['msg_type'] when 'alert' case response['code'] when 2104 puts 'ALERT: data farm connection resumed __ignored__'.light_black when 2108 puts 'ALERT: data farm connection suspended __ignored__'.light_black when 2109 # Order Event Warning:Attribute 'Outside Regular Trading Hours' is ignored # based on the order type and destination. PlaceOrder is now being processed. puts 'ALERT: outside_rth __ignored__'.light_black when 2100 puts 'ALERT: Account_info unsubscribed __ignored__'.light_black when 202 puts 'ALERT: order cancelled' else puts '-------------ALERT------------------------------' puts response.to_s puts '------------------------------------------------' end when 'tick' @tick_proc&.call(response) when 'depth' @depth_proc&.call(response) when 'realtimebar' @bars_proc&.call(response) else puts "ERROR: #{response}" end end end rescue StandardError => e backtrace = e.backtrace.join("\r\n") puts "======= ERROR: '#{e.class}', MESSAGE: '#{e.}'\n#{backtrace}" end end puts '@server_thread spawned' end |
#start_persistent(contract:, type: :realtimebars, local_id: 0, &block) ⇒ Object
136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/cotcube-helpers/data_client.rb', line 136 def start_persistent(contract:, type: :realtimebars, local_id: 0, &block) unless %i[depth ticks realtimebars].include? type.to_sym puts "ERROR: Inappropriate type in stop_realtimebars with #{type}" return false end local_key = "#{contract}_#{local_id}" channel = connection.create_channel exchange = channel.fanout("dataproxy_#{type}_#{contract}") queue = channel.queue('', exclusive: true, auto_delete: true) queue.bind(exchange) ib_contract = Cotcube::Helpers.get_ib_contract(contract) command = { command: type, contract: contract, con_id: ib_contract[:con_id], delivery: queue.name, exchange: exchange.name } block ||= ->() { puts .to_s } subscription = queue.subscribe do |_delivery_info, _properties, payload| block.call(JSON.parse(payload, symbolize_names: true)) end persistent[type][local_key] = command.dup persistent[type][local_key][:queue] = queue persistent[type][local_key][:subscription] = subscription persistent[type][local_key][:channel] = channel send_command(command) end |
#stop ⇒ Object
77 78 79 80 81 82 83 84 85 86 |
# File 'lib/cotcube-helpers/data_client.rb', line 77 def stop %i[depth ticks realtimebars].each do |type| persistent[type].each do |local_key, obj| puts "Cancelling #{local_key}" obj[:subscription].cancel end end commands.close connection.close end |
#stop_persistent(contract:, local_id: 0, type: :realtimebars) ⇒ Object
166 167 168 169 170 171 172 173 174 175 |
# File 'lib/cotcube-helpers/data_client.rb', line 166 def stop_persistent(contract:, local_id: 0, type: :realtimebars) unless %i[depth ticks realtimebars].include? type.to_sym puts "ERROR: Inappropriate type in stop_realtimebars with #{type}" return false end local_key = "#{contract}_#{local_id}" p persistent[type][local_key][:subscription].cancel p persistent[type][local_key][:channel].close persistent[type].delete(local_key) end |