Class: IB::Connection
Overview
Encapsulates API connection to TWS or Gateway
Constant Summary collapse
- DEFAULT_OPTIONS =
Please note, we are realizing only the most current TWS protocol versions, thus improving performance at the expense of backwards compatibility. Older protocol versions support can be found in older gem versions.
{:host =>'127.0.0.1', :port => '4001', # IB Gateway connection (default) #:port => '7496', # TWS connection :connect => true, # Connect at initialization :reader => true, # Start a separate reader Thread :received => true, # Keep all received messages in a @received Hash :logger => nil, :client_id => nil, # Will be randomly assigned :client_version => IB::Messages::CLIENT_VERSION, :server_version => IB::Messages::SERVER_VERSION }
Class Attribute Summary collapse
-
.current ⇒ Object
Returns the value of attribute current.
Instance Attribute Summary collapse
-
#client_id ⇒ Object
Connection options.
-
#client_version ⇒ Object
Connection options.
-
#local_connect_time ⇒ Object
Connection options.
-
#next_local_id ⇒ Object
(also: #next_order_id)
Connection options.
-
#options ⇒ Object
Connection options.
-
#reader ⇒ Object
Connection options.
-
#remote_connect_time ⇒ Object
Connection options.
-
#server_version ⇒ Object
Connection options.
-
#socket ⇒ Object
Connection options.
Instance Method Summary collapse
-
#cancel_order(*local_ids) ⇒ Object
Cancel Orders by their local ids (convenience wrapper for send_message :CancelOrder).
-
#clear_received(*message_types) ⇒ Object
Clear received messages Hash.
-
#connect ⇒ Object
(also: #open)
Working with connection.
- #connected? ⇒ Boolean
- #disconnect ⇒ Object (also: #close)
-
#initialize(opts = {}) ⇒ Connection
constructor
A new instance of Connection.
-
#modify_order(order, contract) ⇒ Object
Modify Order (convenience wrapper for send_message :PlaceOrder).
-
#place_order(order, contract) ⇒ Object
Place Order (convenience wrapper for send_message :PlaceOrder).
-
#process_message ⇒ Object
Process single incoming message (blocking!).
-
#process_messages(poll_time = 200) ⇒ Object
Process incoming messages during poll_time (200) msecs, nonblocking.
- #reader_running? ⇒ Boolean
-
#received ⇒ Object
Hash of received messages, keyed by message type.
-
#received?(message_type, times = 1) ⇒ Boolean
Check if messages of given type were received at_least n times.
-
#satisfied?(*conditions) ⇒ Boolean
Check if all given conditions are satisfied.
-
#send_message(what, *args) ⇒ Object
(also: #dispatch)
Send an outgoing message.
-
#start_reader ⇒ Object
Start reader thread that continuously reads messages from @socket in background.
-
#subscribe(*args, &block) ⇒ Object
Subscribe Proc or block to specific type(s) of incoming message events.
-
#subscribers ⇒ Object
Message subscribers.
-
#unsubscribe(*ids) ⇒ Object
Remove all subscribers with specific subscriber id (TODO: multiple ids).
-
#wait_for(*args, &block) ⇒ Object
Wait for specific condition(s) - given as callable/block, or message type(s) - given as Symbol or [Symbol, times] pair.
Constructor Details
#initialize(opts = {}) ⇒ Connection
Returns a new instance of Connection.
44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/ib/connection.rb', line 44 def initialize opts = {} @options = DEFAULT_OPTIONS.merge(opts) # A couple of locks to avoid race conditions in JRuby @subscribe_lock = Mutex.new @receive_lock = Mutex.new self.default_logger = [:logger] if [:logger] @connected = false self.next_local_id = nil connect if [:connect] Connection.current = self end |
Class Attribute Details
.current ⇒ Object
Returns the value of attribute current.
28 29 30 |
# File 'lib/ib/connection.rb', line 28 def current @current end |
Instance Attribute Details
#client_id ⇒ Object
Connection options
31 32 33 |
# File 'lib/ib/connection.rb', line 31 def client_id @client_id end |
#client_version ⇒ Object
Connection options
31 32 33 |
# File 'lib/ib/connection.rb', line 31 def client_version @client_version end |
#local_connect_time ⇒ Object
Connection options
31 32 33 |
# File 'lib/ib/connection.rb', line 31 def local_connect_time @local_connect_time end |
#next_local_id ⇒ Object Also known as: next_order_id
Connection options
31 32 33 |
# File 'lib/ib/connection.rb', line 31 def next_local_id @next_local_id end |
#options ⇒ Object
Connection options
31 32 33 |
# File 'lib/ib/connection.rb', line 31 def @options end |
#reader ⇒ Object
Connection options
31 32 33 |
# File 'lib/ib/connection.rb', line 31 def reader @reader end |
#remote_connect_time ⇒ Object
Connection options
31 32 33 |
# File 'lib/ib/connection.rb', line 31 def remote_connect_time @remote_connect_time end |
#server_version ⇒ Object
Connection options
31 32 33 |
# File 'lib/ib/connection.rb', line 31 def server_version @server_version end |
#socket ⇒ Object
Connection options
31 32 33 |
# File 'lib/ib/connection.rb', line 31 def socket @socket end |
Instance Method Details
#cancel_order(*local_ids) ⇒ Object
Cancel Orders by their local ids (convenience wrapper for send_message :CancelOrder).
313 314 315 316 317 |
# File 'lib/ib/connection.rb', line 313 def cancel_order *local_ids local_ids.each do |local_id| :CancelOrder, :local_id => local_id.to_i end end |
#clear_received(*message_types) ⇒ Object
Clear received messages Hash
172 173 174 175 176 177 178 179 180 |
# File 'lib/ib/connection.rb', line 172 def clear_received * @receive_lock.synchronize do if .empty? received.each { |, container| container.clear } else .each { || received[].clear } end end end |
#connect ⇒ Object Also known as: open
Working with connection
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
# File 'lib/ib/connection.rb', line 61 def connect error "Already connected!" if connected? # TWS always sends NextValidId message at connect - save this id self.subscribe(:NextValidId) do |msg| self.next_local_id = msg.local_id log.info "Got next valid order id: #{next_local_id}." end @socket = IBSocket.open([:host], [:port]) # Secret handshake @client_version = [:client_version] socket.write_data @client_version @server_version = socket.read_int if @server_version < [:server_version] error "Server version #{@server_version}, #{[:server_version]} required." end @remote_connect_time = socket.read_string @local_connect_time = Time.now # Sending (arbitrary) client ID to identify subsequent communications. # The client with a client_id of 0 can manage the TWS-owned open orders. # Other clients can only manage their own open orders. @client_id = [:client_id] || random_id socket.write_data @client_id @connected = true log.info "Connected to server, ver: #{@server_version}, connection time: " + "#{@local_connect_time} local, " + "#{@remote_connect_time} remote." start_reader if [:reader] # Allows reconnect end |
#connected? ⇒ Boolean
111 112 113 |
# File 'lib/ib/connection.rb', line 111 def connected? @connected end |
#disconnect ⇒ Object Also known as: close
98 99 100 101 102 103 104 105 106 107 |
# File 'lib/ib/connection.rb', line 98 def disconnect if reader_running? @reader_running = false @reader.join end if connected? socket.close @connected = false end end |
#modify_order(order, contract) ⇒ Object
Modify Order (convenience wrapper for send_message :PlaceOrder). Returns order_id.
308 309 310 |
# File 'lib/ib/connection.rb', line 308 def modify_order order, contract order.modify contract, self end |
#place_order(order, contract) ⇒ Object
Place Order (convenience wrapper for send_message :PlaceOrder). Assigns client_id and order_id fields to placed order. Returns assigned order_id.
303 304 305 |
# File 'lib/ib/connection.rb', line 303 def place_order order, contract order.place contract, self end |
#process_message ⇒ Object
Process single incoming message (blocking!)
254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# File 'lib/ib/connection.rb', line 254 def msg_id = socket.read_int # This read blocks! # Debug: log.debug "Got message #{msg_id} (#{Messages::Incoming::Classes[msg_id]})" # Create new instance of the appropriate message type, # and have it read the message from socket. # NB: Failure here usually means unsupported message type received error "Got unsupported message #{msg_id}" unless Messages::Incoming::Classes[msg_id] msg = Messages::Incoming::Classes[msg_id].new(socket) # Deliver message to all registered subscribers, alert if no subscribers @subscribe_lock.synchronize do subscribers[msg.class].each { |_, subscriber| subscriber.call(msg) } end log.warn "No subscribers for message #{msg.class}!" if subscribers[msg.class].empty? # Collect all received messages into a @received Hash if [:received] @receive_lock.synchronize do received[msg.] << msg end end end |
#process_messages(poll_time = 200) ⇒ Object
Process incoming messages during poll_time (200) msecs, nonblocking
245 246 247 248 249 250 251 |
# File 'lib/ib/connection.rb', line 245 def poll_time = 200 # in msec time_out = Time.now + poll_time/1000.0 while (time_left = time_out - Time.now) > 0 # If socket is readable, process single incoming message if select [socket], nil, nil, time_left end end |
#reader_running? ⇒ Boolean
240 241 242 |
# File 'lib/ib/connection.rb', line 240 def reader_running? @reader_running && @reader && @reader.alive? end |
#received ⇒ Object
Hash of received messages, keyed by message type
183 184 185 |
# File 'lib/ib/connection.rb', line 183 def received @received ||= Hash.new { |hash, | hash[] = Array.new } end |
#received?(message_type, times = 1) ⇒ Boolean
Check if messages of given type were received at_least n times
188 189 190 191 192 |
# File 'lib/ib/connection.rb', line 188 def received? , times=1 @receive_lock.synchronize do received[].size >= times end end |
#satisfied?(*conditions) ⇒ Boolean
Check if all given conditions are satisfied
195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
# File 'lib/ib/connection.rb', line 195 def satisfied? *conditions !conditions.empty? && conditions.inject(true) do |result, condition| result && if condition.is_a?(Symbol) received?(condition) elsif condition.is_a?(Array) received?(*condition) elsif condition.respond_to?(:call) condition.call else error "Unknown wait condition #{condition}" end end end |
#send_message(what, *args) ⇒ Object Also known as: dispatch
Send an outgoing message.
283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 |
# File 'lib/ib/connection.rb', line 283 def what, *args = case when what.is_a?(Messages::Outgoing::AbstractMessage) what when what.is_a?(Class) && what < Messages::Outgoing::AbstractMessage what.new *args when what.is_a?(Symbol) Messages::Outgoing.const_get(what).new *args else error "Only able to send outgoing IB messages", :args end error "Not able to send messages, IB not connected!" unless connected? .send_to socket end |
#start_reader ⇒ Object
Start reader thread that continuously reads messages from @socket in background. If you don’t start reader, you should manually poll @socket for messages or use #process_messages(msec) API.
232 233 234 235 236 237 238 |
# File 'lib/ib/connection.rb', line 232 def start_reader Thread.abort_on_exception = true @reader_running = true @reader = Thread.new do while @reader_running end end |
#subscribe(*args, &block) ⇒ Object
Subscribe Proc or block to specific type(s) of incoming message events. Listener will be called later with received message instance as its argument. Returns subscriber id to allow unsubscribing
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/ib/connection.rb', line 120 def subscribe *args, &block @subscribe_lock.synchronize do subscriber = args.last.respond_to?(:call) ? args.pop : block id = random_id error "Need subscriber proc or block", :args unless subscriber.is_a? Proc args.each do |what| = case when what.is_a?(Class) && what < Messages::Incoming::AbstractMessage [what] when what.is_a?(Symbol) [Messages::Incoming.const_get(what)] when what.is_a?(Regexp) Messages::Incoming::Classes.values.find_all { |klass| klass.to_s =~ what } else error "#{what} must represent incoming IB message class", :args end .flatten.each do || # TODO: Fix: RuntimeError: can't add a new key into hash during iteration subscribers[][id] = subscriber end end id end end |
#subscribers ⇒ Object
Message subscribers. Key is the message class to listen for. Value is a Hash of subscriber Procs, keyed by their subscription id. All subscriber Procs will be called with the message instance as an argument when a message of that type is received.
165 166 167 |
# File 'lib/ib/connection.rb', line 165 def subscribers @subscribers ||= Hash.new { |hash, subs| hash[subs] = Hash.new } end |
#unsubscribe(*ids) ⇒ Object
Remove all subscribers with specific subscriber id (TODO: multiple ids)
149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/ib/connection.rb', line 149 def unsubscribe *ids @subscribe_lock.synchronize do removed = [] ids.each do |id| removed_at_id = subscribers.map { |_, subscribers| subscribers.delete id }.compact error "No subscribers with id #{id}" if removed_at_id.empty? removed << removed_at_id end removed.flatten end end |
#wait_for(*args, &block) ⇒ Object
Wait for specific condition(s) - given as callable/block, or message type(s) - given as Symbol or [Symbol, times] pair. Timeout after given time or 1 second.
213 214 215 216 217 218 219 220 221 222 223 224 225 |
# File 'lib/ib/connection.rb', line 213 def wait_for *args, &block timeout = args.find { |arg| arg.is_a? Numeric } # extract timeout from args end_time = Time.now + (timeout || 1) # default timeout 1 sec conditions = args.delete_if { |arg| arg.is_a? Numeric }.push(block).compact until end_time < Time.now || satisfied?(*conditions) if @reader sleep 0.05 else 50 end end end |