Class: OandaAPI::Streaming::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/oanda_api/streaming/client.rb

Overview

Examples:

Example Usage

client = OandaAPI::Streaming::Client.new :practice, ENV.fetch("ONADA_PRACTICE_TOKEN")

# IMPORTANT
# This code will block indefinitely because streaming executes as an infinite loop.
prices = client.prices(account_id: 1234, instruments: %w[AUD_CAD AUD_CHF])
prices.stream do |price|

  # The code in this block should handle the price as efficently
  # as possible, otherwise the connection could timeout.
  # For example, you could publish the tick on a queue to be handled
  # by some other thread or process.
  price  # => OandaAPI::Resource::Price
end

client.events.stream do |transaction|
  transaction # => OandaAPI::Resource::Transaction
end

# -- Stopping the stream --
# You may add a second argument to the block to yield the client itself.
# You can use it to issue a client.stop! to terminate streaming.
@prices = []
prices = client.prices(account_id: 1234, instruments: %w[AUD_CAD AUD_CHF])
prices.stream do |price, client|
  @prices << price
  client.stop! if @prices.size == 10
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(domain, auth_token) ⇒ Client

Returns a new instance of Client.

Parameters:



57
58
59
60
61
62
63
# File 'lib/oanda_api/streaming/client.rb', line 57

def initialize(domain, auth_token)
  super()
  @auth_token = auth_token
  self.domain = domain
  @headers = auth
  @streaming_request = nil
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(sym, *args) ⇒ OandaAPI::Client::NamespaceProxy (private)

Enables method-chaining.



149
150
151
# File 'lib/oanda_api/streaming/client.rb', line 149

def method_missing(sym, *args)
  OandaAPI::Client::NamespaceProxy.new self, sym, args.first
end

Instance Attribute Details

#auth_tokenString (readonly)

Returns Oanda personal access token.

Returns:

  • (String)

    Oanda personal access token.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/oanda_api/streaming/client.rb', line 51

class Client
  attr_reader :auth_token, :streaming_request
  attr_accessor :domain, :headers

  # @param [Symbol] domain see {#domain}
  # @param [String] auth_token see {#auth_token}
  def initialize(domain, auth_token)
    super()
    @auth_token = auth_token
    self.domain = domain
    @headers = auth
    @streaming_request = nil
  end

  # Returns an absolute URI for a resource request.
  #
  # @param [String] path the path portion of the URI.
  #
  # @return [String] a URI.
  def api_uri(path)
    uri = "#{BASE_URI[domain]}#{path}"
    uri.sub "[API_VERSION]", OandaAPI.configuration.rest_api_version
  end

  # Parameters used for authentication.
  # @return [Hash]
  def auth
    { "Authorization" => "Bearer #{auth_token}" }
  end

  # @private
  # Sets the domain the client can access.
  # @return [void]
  def domain=(value)
    fail ArgumentError, "Invalid domain" unless OandaAPI::DOMAINS.include? value
    @domain = value
  end

  # Determines whether emitted resources should include heartbeats.
  # @param [boolean] value
  def emit_heartbeats=(value)
    @emit_heartbeats = value
    streaming_request.emit_heartbeats = value if streaming_request
  end

  # Returns `true` if emitted resources include heartbeats. Defaults to false.
  # @return [boolean]
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    streaming_request.stop! if running?
  end

  # Returns `true` if the client is currently streaming.
  # @return [boolean]
  def running?
    !!(streaming_request && streaming_request.running?)
  end

  # @private
  # Executes a streaming http request.
  #
  # @param [Symbol] _method Ignored.
  #
  # @param [String] path the path of an Oanda resource request.
  #
  # @param [Hash] conditions optional parameters that are converted into query parameters.
  #
  # @yield [OandaAPI:ResourceBase, OandaAPI::Streaming::Client]  See {OandaAPI::Streaming::Request.stream}
  #
  # @return [void]
  #
  # @raise [OandaAPI::RequestError] if the API return code is not 2xx.
  # @raise [OandaAPI::StreamingDisconnect] if the API disconnects.
  def execute_request(_method, path, conditions = {}, &block)
    Http::Exceptions.wrap_and_check do
      @streaming_request = OandaAPI::Streaming::Request.new client: self,
                                                            uri: api_uri(path),
                                                            query: Utils.stringify_keys(conditions),
                                                            headers: OandaAPI.configuration.headers.merge(headers)
      @streaming_request.emit_heartbeats = emit_heartbeats?
      @streaming_request.stream(&block)
      return nil
    end

    rescue Http::Exceptions::HttpException => e
      raise OandaAPI::RequestError, e.message
  end

  private

  # @private
  # Enables method-chaining.
  # @return [OandaAPI::Client::NamespaceProxy]
  def method_missing(sym, *args)
    OandaAPI::Client::NamespaceProxy.new self, sym, args.first
  end
end

#domainSymbol

Returns identifies the Oanda subdomain (:practice or :live) accessed by the client.

Returns:

  • (Symbol)

    identifies the Oanda subdomain (:practice or :live) accessed by the client.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/oanda_api/streaming/client.rb', line 51

class Client
  attr_reader :auth_token, :streaming_request
  attr_accessor :domain, :headers

  # @param [Symbol] domain see {#domain}
  # @param [String] auth_token see {#auth_token}
  def initialize(domain, auth_token)
    super()
    @auth_token = auth_token
    self.domain = domain
    @headers = auth
    @streaming_request = nil
  end

  # Returns an absolute URI for a resource request.
  #
  # @param [String] path the path portion of the URI.
  #
  # @return [String] a URI.
  def api_uri(path)
    uri = "#{BASE_URI[domain]}#{path}"
    uri.sub "[API_VERSION]", OandaAPI.configuration.rest_api_version
  end

  # Parameters used for authentication.
  # @return [Hash]
  def auth
    { "Authorization" => "Bearer #{auth_token}" }
  end

  # @private
  # Sets the domain the client can access.
  # @return [void]
  def domain=(value)
    fail ArgumentError, "Invalid domain" unless OandaAPI::DOMAINS.include? value
    @domain = value
  end

  # Determines whether emitted resources should include heartbeats.
  # @param [boolean] value
  def emit_heartbeats=(value)
    @emit_heartbeats = value
    streaming_request.emit_heartbeats = value if streaming_request
  end

  # Returns `true` if emitted resources include heartbeats. Defaults to false.
  # @return [boolean]
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    streaming_request.stop! if running?
  end

  # Returns `true` if the client is currently streaming.
  # @return [boolean]
  def running?
    !!(streaming_request && streaming_request.running?)
  end

  # @private
  # Executes a streaming http request.
  #
  # @param [Symbol] _method Ignored.
  #
  # @param [String] path the path of an Oanda resource request.
  #
  # @param [Hash] conditions optional parameters that are converted into query parameters.
  #
  # @yield [OandaAPI:ResourceBase, OandaAPI::Streaming::Client]  See {OandaAPI::Streaming::Request.stream}
  #
  # @return [void]
  #
  # @raise [OandaAPI::RequestError] if the API return code is not 2xx.
  # @raise [OandaAPI::StreamingDisconnect] if the API disconnects.
  def execute_request(_method, path, conditions = {}, &block)
    Http::Exceptions.wrap_and_check do
      @streaming_request = OandaAPI::Streaming::Request.new client: self,
                                                            uri: api_uri(path),
                                                            query: Utils.stringify_keys(conditions),
                                                            headers: OandaAPI.configuration.headers.merge(headers)
      @streaming_request.emit_heartbeats = emit_heartbeats?
      @streaming_request.stream(&block)
      return nil
    end

    rescue Http::Exceptions::HttpException => e
      raise OandaAPI::RequestError, e.message
  end

  private

  # @private
  # Enables method-chaining.
  # @return [OandaAPI::Client::NamespaceProxy]
  def method_missing(sym, *args)
    OandaAPI::Client::NamespaceProxy.new self, sym, args.first
  end
end

#headersHash

Returns parameters that are included with every API request as HTTP headers.

Returns:

  • (Hash)

    parameters that are included with every API request as HTTP headers.



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/oanda_api/streaming/client.rb', line 51

class Client
  attr_reader :auth_token, :streaming_request
  attr_accessor :domain, :headers

  # @param [Symbol] domain see {#domain}
  # @param [String] auth_token see {#auth_token}
  def initialize(domain, auth_token)
    super()
    @auth_token = auth_token
    self.domain = domain
    @headers = auth
    @streaming_request = nil
  end

  # Returns an absolute URI for a resource request.
  #
  # @param [String] path the path portion of the URI.
  #
  # @return [String] a URI.
  def api_uri(path)
    uri = "#{BASE_URI[domain]}#{path}"
    uri.sub "[API_VERSION]", OandaAPI.configuration.rest_api_version
  end

  # Parameters used for authentication.
  # @return [Hash]
  def auth
    { "Authorization" => "Bearer #{auth_token}" }
  end

  # @private
  # Sets the domain the client can access.
  # @return [void]
  def domain=(value)
    fail ArgumentError, "Invalid domain" unless OandaAPI::DOMAINS.include? value
    @domain = value
  end

  # Determines whether emitted resources should include heartbeats.
  # @param [boolean] value
  def emit_heartbeats=(value)
    @emit_heartbeats = value
    streaming_request.emit_heartbeats = value if streaming_request
  end

  # Returns `true` if emitted resources include heartbeats. Defaults to false.
  # @return [boolean]
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    streaming_request.stop! if running?
  end

  # Returns `true` if the client is currently streaming.
  # @return [boolean]
  def running?
    !!(streaming_request && streaming_request.running?)
  end

  # @private
  # Executes a streaming http request.
  #
  # @param [Symbol] _method Ignored.
  #
  # @param [String] path the path of an Oanda resource request.
  #
  # @param [Hash] conditions optional parameters that are converted into query parameters.
  #
  # @yield [OandaAPI:ResourceBase, OandaAPI::Streaming::Client]  See {OandaAPI::Streaming::Request.stream}
  #
  # @return [void]
  #
  # @raise [OandaAPI::RequestError] if the API return code is not 2xx.
  # @raise [OandaAPI::StreamingDisconnect] if the API disconnects.
  def execute_request(_method, path, conditions = {}, &block)
    Http::Exceptions.wrap_and_check do
      @streaming_request = OandaAPI::Streaming::Request.new client: self,
                                                            uri: api_uri(path),
                                                            query: Utils.stringify_keys(conditions),
                                                            headers: OandaAPI.configuration.headers.merge(headers)
      @streaming_request.emit_heartbeats = emit_heartbeats?
      @streaming_request.stream(&block)
      return nil
    end

    rescue Http::Exceptions::HttpException => e
      raise OandaAPI::RequestError, e.message
  end

  private

  # @private
  # Enables method-chaining.
  # @return [OandaAPI::Client::NamespaceProxy]
  def method_missing(sym, *args)
    OandaAPI::Client::NamespaceProxy.new self, sym, args.first
  end
end

#streaming_requestOandaAPI::Streaming::Request (readonly)



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# File 'lib/oanda_api/streaming/client.rb', line 51

class Client
  attr_reader :auth_token, :streaming_request
  attr_accessor :domain, :headers

  # @param [Symbol] domain see {#domain}
  # @param [String] auth_token see {#auth_token}
  def initialize(domain, auth_token)
    super()
    @auth_token = auth_token
    self.domain = domain
    @headers = auth
    @streaming_request = nil
  end

  # Returns an absolute URI for a resource request.
  #
  # @param [String] path the path portion of the URI.
  #
  # @return [String] a URI.
  def api_uri(path)
    uri = "#{BASE_URI[domain]}#{path}"
    uri.sub "[API_VERSION]", OandaAPI.configuration.rest_api_version
  end

  # Parameters used for authentication.
  # @return [Hash]
  def auth
    { "Authorization" => "Bearer #{auth_token}" }
  end

  # @private
  # Sets the domain the client can access.
  # @return [void]
  def domain=(value)
    fail ArgumentError, "Invalid domain" unless OandaAPI::DOMAINS.include? value
    @domain = value
  end

  # Determines whether emitted resources should include heartbeats.
  # @param [boolean] value
  def emit_heartbeats=(value)
    @emit_heartbeats = value
    streaming_request.emit_heartbeats = value if streaming_request
  end

  # Returns `true` if emitted resources include heartbeats. Defaults to false.
  # @return [boolean]
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    streaming_request.stop! if running?
  end

  # Returns `true` if the client is currently streaming.
  # @return [boolean]
  def running?
    !!(streaming_request && streaming_request.running?)
  end

  # @private
  # Executes a streaming http request.
  #
  # @param [Symbol] _method Ignored.
  #
  # @param [String] path the path of an Oanda resource request.
  #
  # @param [Hash] conditions optional parameters that are converted into query parameters.
  #
  # @yield [OandaAPI:ResourceBase, OandaAPI::Streaming::Client]  See {OandaAPI::Streaming::Request.stream}
  #
  # @return [void]
  #
  # @raise [OandaAPI::RequestError] if the API return code is not 2xx.
  # @raise [OandaAPI::StreamingDisconnect] if the API disconnects.
  def execute_request(_method, path, conditions = {}, &block)
    Http::Exceptions.wrap_and_check do
      @streaming_request = OandaAPI::Streaming::Request.new client: self,
                                                            uri: api_uri(path),
                                                            query: Utils.stringify_keys(conditions),
                                                            headers: OandaAPI.configuration.headers.merge(headers)
      @streaming_request.emit_heartbeats = emit_heartbeats?
      @streaming_request.stream(&block)
      return nil
    end

    rescue Http::Exceptions::HttpException => e
      raise OandaAPI::RequestError, e.message
  end

  private

  # @private
  # Enables method-chaining.
  # @return [OandaAPI::Client::NamespaceProxy]
  def method_missing(sym, *args)
    OandaAPI::Client::NamespaceProxy.new self, sym, args.first
  end
end

Instance Method Details

#api_uri(path) ⇒ String

Returns an absolute URI for a resource request.

Parameters:

  • path (String)

    the path portion of the URI.

Returns:

  • (String)

    a URI.



70
71
72
73
# File 'lib/oanda_api/streaming/client.rb', line 70

def api_uri(path)
  uri = "#{BASE_URI[domain]}#{path}"
  uri.sub "[API_VERSION]", OandaAPI.configuration.rest_api_version
end

#authHash

Parameters used for authentication.

Returns:

  • (Hash)


77
78
79
# File 'lib/oanda_api/streaming/client.rb', line 77

def auth
  { "Authorization" => "Bearer #{auth_token}" }
end

#emit_heartbeats=(value) ⇒ Object

Determines whether emitted resources should include heartbeats.

Parameters:

  • value (boolean)


91
92
93
94
# File 'lib/oanda_api/streaming/client.rb', line 91

def emit_heartbeats=(value)
  @emit_heartbeats = value
  streaming_request.emit_heartbeats = value if streaming_request
end

#emit_heartbeats?boolean

Returns true if emitted resources include heartbeats. Defaults to false.

Returns:

  • (boolean)


98
99
100
# File 'lib/oanda_api/streaming/client.rb', line 98

def emit_heartbeats?
  !!@emit_heartbeats
end

#execute_request(_method, path, conditions = {}) {|OandaAPI:ResourceBase, OandaAPI::Streaming::Client| ... } ⇒ void

This method returns an undefined value.

Executes a streaming http request.

Parameters:

  • _method (Symbol)

    Ignored.

  • path (String)

    the path of an Oanda resource request.

  • conditions (Hash) (defaults to: {})

    optional parameters that are converted into query parameters.

Yields:

Raises:



129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/oanda_api/streaming/client.rb', line 129

def execute_request(_method, path, conditions = {}, &block)
  Http::Exceptions.wrap_and_check do
    @streaming_request = OandaAPI::Streaming::Request.new client: self,
                                                          uri: api_uri(path),
                                                          query: Utils.stringify_keys(conditions),
                                                          headers: OandaAPI.configuration.headers.merge(headers)
    @streaming_request.emit_heartbeats = emit_heartbeats?
    @streaming_request.stream(&block)
    return nil
  end

  rescue Http::Exceptions::HttpException => e
    raise OandaAPI::RequestError, e.message
end

#running?boolean

Returns true if the client is currently streaming.

Returns:

  • (boolean)


110
111
112
# File 'lib/oanda_api/streaming/client.rb', line 110

def running?
  !!(streaming_request && streaming_request.running?)
end

#stop!void

This method returns an undefined value.

Signals the streaming request to disconnect and terminates streaming.



104
105
106
# File 'lib/oanda_api/streaming/client.rb', line 104

def stop!
  streaming_request.stop! if running?
end