Class: IB::Connection

Inherits:
Object show all
Defined in:
lib/ib/connection.rb

Overview

Encapsulates API connection to TWS or Gateway

Constant Summary collapse

DEFAULT_OPTIONS =

Please note, we are realizing only the most current TWS protocol versions, thus improving performance at the expense of backwards compatibility. Older protocol versions support can be found in older gem versions.

{:host =>'127.0.0.1',
:port => '4001', # IB Gateway connection (default)
#:port => '7496', # TWS connection
:connect => true, # Connect at initialization
:reader => true, # Start a separate reader Thread
:received => true, # Keep all received messages in a @received Hash
:logger => nil,
:client_id => nil, # Will be randomly assigned
:client_version => IB::Messages::CLIENT_VERSION,
:server_version => IB::Messages::SERVER_VERSION
}

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts = {}) ⇒ Connection

Returns a new instance of Connection.



44
45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/ib/connection.rb', line 44

def initialize opts = {}
  @options = DEFAULT_OPTIONS.merge(opts)

  # A couple of locks to avoid race conditions in JRuby
  @subscribe_lock = Mutex.new
  @receive_lock = Mutex.new

  self.default_logger = options[:logger] if options[:logger]
  @connected = false
  self.next_local_id = nil

  connect if options[:connect]
  Connection.current = self
end

Class Attribute Details

.currentObject

Returns the value of attribute current.



28
29
30
# File 'lib/ib/connection.rb', line 28

def current
  @current
end

Instance Attribute Details

#client_idObject

Connection options



31
32
33
# File 'lib/ib/connection.rb', line 31

def client_id
  @client_id
end

#client_versionObject

Connection options



31
32
33
# File 'lib/ib/connection.rb', line 31

def client_version
  @client_version
end

#local_connect_timeObject

Connection options



31
32
33
# File 'lib/ib/connection.rb', line 31

def local_connect_time
  @local_connect_time
end

#next_local_idObject Also known as: next_order_id

Connection options



31
32
33
# File 'lib/ib/connection.rb', line 31

def next_local_id
  @next_local_id
end

#optionsObject

Connection options



31
32
33
# File 'lib/ib/connection.rb', line 31

def options
  @options
end

#readerObject

Connection options



31
32
33
# File 'lib/ib/connection.rb', line 31

def reader
  @reader
end

#remote_connect_timeObject

Connection options



31
32
33
# File 'lib/ib/connection.rb', line 31

def remote_connect_time
  @remote_connect_time
end

#server_versionObject

Connection options



31
32
33
# File 'lib/ib/connection.rb', line 31

def server_version
  @server_version
end

#socketObject

Connection options



31
32
33
# File 'lib/ib/connection.rb', line 31

def socket
  @socket
end

Instance Method Details

#cancel_order(*local_ids) ⇒ Object

Cancel Orders by their local ids (convenience wrapper for send_message :CancelOrder).



313
314
315
316
317
# File 'lib/ib/connection.rb', line 313

def cancel_order *local_ids
  local_ids.each do |local_id|
    send_message :CancelOrder, :local_id => local_id.to_i
  end
end

#clear_received(*message_types) ⇒ Object

Clear received messages Hash



172
173
174
175
176
177
178
179
180
# File 'lib/ib/connection.rb', line 172

def clear_received *message_types
  @receive_lock.synchronize do
    if message_types.empty?
      received.each { |message_type, container| container.clear }
    else
      message_types.each { |message_type| received[message_type].clear }
    end
  end
end

#connectObject Also known as: open

Working with connection



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# File 'lib/ib/connection.rb', line 61

def connect
  error "Already connected!" if connected?

  # TWS always sends NextValidId message at connect - save this id
  self.subscribe(:NextValidId) do |msg|
    self.next_local_id = msg.local_id
    log.info "Got next valid order id: #{next_local_id}."
  end

  @socket = IBSocket.open(options[:host], options[:port])

  # Secret handshake
  @client_version = options[:client_version]
  socket.write_data @client_version
  @server_version = socket.read_int
  if @server_version < options[:server_version]
    error "Server version #{@server_version}, #{options[:server_version]} required."
  end
  @remote_connect_time = socket.read_string
  @local_connect_time = Time.now

  # Sending (arbitrary) client ID to identify subsequent communications.
  # The client with a client_id of 0 can manage the TWS-owned open orders.
  # Other clients can only manage their own open orders.
  @client_id = options[:client_id] || random_id
  socket.write_data @client_id

  @connected = true
  log.info "Connected to server, ver: #{@server_version}, connection time: " +
    "#{@local_connect_time} local, " +
    "#{@remote_connect_time} remote."

  start_reader if options[:reader] # Allows reconnect
end

#connected?Boolean

Returns:

  • (Boolean)


111
112
113
# File 'lib/ib/connection.rb', line 111

def connected?
  @connected
end

#disconnectObject Also known as: close



98
99
100
101
102
103
104
105
106
107
# File 'lib/ib/connection.rb', line 98

def disconnect
  if reader_running?
    @reader_running = false
    @reader.join
  end
  if connected?
    socket.close
    @connected = false
  end
end

#modify_order(order, contract) ⇒ Object

Modify Order (convenience wrapper for send_message :PlaceOrder). Returns order_id.



308
309
310
# File 'lib/ib/connection.rb', line 308

def modify_order order, contract
  order.modify contract, self
end

#place_order(order, contract) ⇒ Object

Place Order (convenience wrapper for send_message :PlaceOrder). Assigns client_id and order_id fields to placed order. Returns assigned order_id.



303
304
305
# File 'lib/ib/connection.rb', line 303

def place_order order, contract
  order.place contract, self
end

#process_messageObject

Process single incoming message (blocking!)



254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
# File 'lib/ib/connection.rb', line 254

def process_message
  msg_id = socket.read_int # This read blocks!

  # Debug:
  log.debug "Got message #{msg_id} (#{Messages::Incoming::Classes[msg_id]})"

  # Create new instance of the appropriate message type,
  # and have it read the message from socket.
  # NB: Failure here usually means unsupported message type received
  error "Got unsupported message #{msg_id}" unless Messages::Incoming::Classes[msg_id]
  msg = Messages::Incoming::Classes[msg_id].new(socket)

  # Deliver message to all registered subscribers, alert if no subscribers
  @subscribe_lock.synchronize do
    subscribers[msg.class].each { |_, subscriber| subscriber.call(msg) }
  end
  log.warn "No subscribers for message #{msg.class}!" if subscribers[msg.class].empty?

  # Collect all received messages into a @received Hash
  if options[:received]
    @receive_lock.synchronize do
      received[msg.message_type] << msg
    end
  end
end

#process_messages(poll_time = 200) ⇒ Object

Process incoming messages during poll_time (200) msecs, nonblocking



245
246
247
248
249
250
251
# File 'lib/ib/connection.rb', line 245

def process_messages poll_time = 200 # in msec
  time_out = Time.now + poll_time/1000.0
  while (time_left = time_out - Time.now) > 0
    # If socket is readable, process single incoming message
    process_message if select [socket], nil, nil, time_left
  end
end

#reader_running?Boolean

Returns:

  • (Boolean)


240
241
242
# File 'lib/ib/connection.rb', line 240

def reader_running?
  @reader_running && @reader && @reader.alive?
end

#receivedObject

Hash of received messages, keyed by message type



183
184
185
# File 'lib/ib/connection.rb', line 183

def received
  @received ||= Hash.new { |hash, message_type| hash[message_type] = Array.new }
end

#received?(message_type, times = 1) ⇒ Boolean

Check if messages of given type were received at_least n times

Returns:

  • (Boolean)


188
189
190
191
192
# File 'lib/ib/connection.rb', line 188

def received? message_type, times=1
  @receive_lock.synchronize do
    received[message_type].size >= times
  end
end

#satisfied?(*conditions) ⇒ Boolean

Check if all given conditions are satisfied

Returns:

  • (Boolean)


195
196
197
198
199
200
201
202
203
204
205
206
207
208
# File 'lib/ib/connection.rb', line 195

def satisfied? *conditions
  !conditions.empty? &&
  conditions.inject(true) do |result, condition|
    result && if condition.is_a?(Symbol)
    received?(condition)
    elsif condition.is_a?(Array)
      received?(*condition)
    elsif condition.respond_to?(:call)
      condition.call
    else
      error "Unknown wait condition #{condition}"
    end
  end
end

#send_message(what, *args) ⇒ Object Also known as: dispatch

Send an outgoing message.



283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/ib/connection.rb', line 283

def send_message what, *args
  message =
  case
  when what.is_a?(Messages::Outgoing::AbstractMessage)
    what
  when what.is_a?(Class) && what < Messages::Outgoing::AbstractMessage
    what.new *args
  when what.is_a?(Symbol)
    Messages::Outgoing.const_get(what).new *args
  else
    error "Only able to send outgoing IB messages", :args
  end
  error "Not able to send messages, IB not connected!" unless connected?
  message.send_to socket
end

#start_readerObject

Start reader thread that continuously reads messages from @socket in background. If you don’t start reader, you should manually poll @socket for messages or use #process_messages(msec) API.



232
233
234
235
236
237
238
# File 'lib/ib/connection.rb', line 232

def start_reader
  Thread.abort_on_exception = true
  @reader_running = true
  @reader = Thread.new do
    process_messages while @reader_running
  end
end

#subscribe(*args, &block) ⇒ Object

Subscribe Proc or block to specific type(s) of incoming message events. Listener will be called later with received message instance as its argument. Returns subscriber id to allow unsubscribing



120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/ib/connection.rb', line 120

def subscribe *args, &block
  @subscribe_lock.synchronize do
    subscriber = args.last.respond_to?(:call) ? args.pop : block
    id = random_id

    error "Need subscriber proc or block", :args unless subscriber.is_a? Proc

    args.each do |what|
      message_classes =
      case
      when what.is_a?(Class) && what < Messages::Incoming::AbstractMessage
        [what]
      when what.is_a?(Symbol)
        [Messages::Incoming.const_get(what)]
      when what.is_a?(Regexp)
        Messages::Incoming::Classes.values.find_all { |klass| klass.to_s =~ what }
      else
        error "#{what} must represent incoming IB message class", :args
      end
      message_classes.flatten.each do |message_class|
        # TODO: Fix: RuntimeError: can't add a new key into hash during iteration
        subscribers[message_class][id] = subscriber
      end
    end
    id
  end
end

#subscribersObject

Message subscribers. Key is the message class to listen for. Value is a Hash of subscriber Procs, keyed by their subscription id. All subscriber Procs will be called with the message instance as an argument when a message of that type is received.



165
166
167
# File 'lib/ib/connection.rb', line 165

def subscribers
  @subscribers ||= Hash.new { |hash, subs| hash[subs] = Hash.new }
end

#unsubscribe(*ids) ⇒ Object

Remove all subscribers with specific subscriber id (TODO: multiple ids)



149
150
151
152
153
154
155
156
157
158
159
# File 'lib/ib/connection.rb', line 149

def unsubscribe *ids
  @subscribe_lock.synchronize do
    removed = []
    ids.each do |id|
      removed_at_id = subscribers.map { |_, subscribers| subscribers.delete id }.compact
      error "No subscribers with id #{id}" if removed_at_id.empty?
      removed << removed_at_id
    end
    removed.flatten
  end
end

#wait_for(*args, &block) ⇒ Object

Wait for specific condition(s) - given as callable/block, or message type(s) - given as Symbol or [Symbol, times] pair. Timeout after given time or 1 second.



213
214
215
216
217
218
219
220
221
222
223
224
225
# File 'lib/ib/connection.rb', line 213

def wait_for *args, &block
  timeout = args.find { |arg| arg.is_a? Numeric } # extract timeout from args
  end_time = Time.now + (timeout || 1) # default timeout 1 sec
  conditions = args.delete_if { |arg| arg.is_a? Numeric }.push(block).compact

  until end_time < Time.now || satisfied?(*conditions)
    if @reader
      sleep 0.05
    else
      process_messages 50
    end
  end
end