Class: OandaAPI::Streaming::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/oanda_api/streaming/client.rb

Overview

Examples:

Example Usage

client = OandaAPI::Streaming::Client.new :practice, ENV.fetch("ONADA_PRACTICE_TOKEN")

# IMPORTANT
# This code will block indefinitely because streaming executes as an infinite loop.
prices = client.prices(account_id: 1234, instruments: %w[AUD_CAD AUD_CHF])
prices.stream do |price|

  # The code in this block should handle the price as efficently
  # as possible, otherwise the connection could timeout.
  # For example, you could publish the tick on a queue to be handled
  # by some other thread or process.
  price  # => OandaAPI::Resource::Price
end

client.events.stream do |transaction|
  transaction # => OandaAPI::Resource::Transaction
end

# -- Stopping the stream --
# You may add a second argument to the block to yield the client itself.
# You can use it to issue a client.stop! to terminate streaming.
@prices = []
prices = client.prices(account_id: 1234, instruments: %w[AUD_CAD AUD_CHF])
prices.stream do |price, client|
  @prices << price
  client.stop! if @prices.size == 10
end

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(domain, auth_token) ⇒ Client


57
58
59
60
61
62
63
# File 'lib/oanda_api/streaming/client.rb', line 57

def initialize(domain, auth_token)
  super()
  @auth_token = auth_token
  self.domain = domain
  @headers = auth
  @streaming_request = nil
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

#method_missing(sym, *args) ⇒ OandaAPI::Client::NamespaceProxy (private)

Enables method-chaining.


148
149
150
# File 'lib/oanda_api/streaming/client.rb', line 148

def method_missing(sym, *args)
  OandaAPI::Client::NamespaceProxy.new self, sym, args.first
end

Instance Attribute Details

#auth_tokenString (readonly)


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/oanda_api/streaming/client.rb', line 51

class Client
  attr_reader :auth_token, :streaming_request
  attr_accessor :domain, :headers

  # @param [Symbol] domain see {#domain}
  # @param [String] auth_token see {#auth_token}
  def initialize(domain, auth_token)
    super()
    @auth_token = auth_token
    self.domain = domain
    @headers = auth
    @streaming_request = nil
  end

  # Returns an absolute URI for a resource request.
  #
  # @param [String] path the path portion of the URI.
  #
  # @return [String] a URI.
  def api_uri(path)
    uri = "#{BASE_URI[domain]}#{path}"
    uri.sub "[API_VERSION]", OandaAPI.configuration.rest_api_version
  end

  # Parameters used for authentication.
  # @return [Hash]
  def auth
    { "Authorization" => "Bearer #{auth_token}" }
  end

  # @private
  # Sets the domain the client can access.
  # @return [void]
  def domain=(value)
    fail ArgumentError, "Invalid domain" unless OandaAPI::DOMAINS.include? value
    @domain = value
  end

  # Determines whether emitted resources should include heartbeats.
  # @param [boolean] value
  def emit_heartbeats=(value)
    @emit_heartbeats = value
    streaming_request.emit_heartbeats = value if streaming_request
  end

  # Returns `true` if emitted resources include heartbeats. Defaults to false.
  # @return [boolean]
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    streaming_request.stop! if running?
  end

  # Returns `true` if the client is currently streaming.
  # @return [boolean]
  def running?
    !!(streaming_request && streaming_request.running?)
  end

  # @private
  # Executes a streaming http request.
  #
  # @param [Symbol] _method Ignored.
  #
  # @param [String] path the path of an Oanda resource request.
  #
  # @param [Hash] conditions optional parameters that are converted into query parameters.
  #
  # @yield [OandaAPI:ResourceBase, OandaAPI::Streaming::Client]  See {OandaAPI::Streaming::Request.stream}
  #
  # @return [void]
  #
  # @raise [OandaAPI::RequestError] if the API return code is not 2xx.
  # @raise [OandaAPI::StreamingDisconnect] if the API disconnects.
  def execute_request(_method, path, conditions = {}, &block)
    Http::Exceptions.wrap_and_check do
      @streaming_request = OandaAPI::Streaming::Request.new client: self,
                                                            uri: api_uri(path),
                                                            query: Utils.stringify_keys(conditions),
                                                            headers: OandaAPI.configuration.headers.merge(headers)
      @streaming_request.stream(&block)
      return nil
    end

    rescue Http::Exceptions::HttpException => e
      raise OandaAPI::RequestError, e.message
  end

  private

  # @private
  # Enables method-chaining.
  # @return [OandaAPI::Client::NamespaceProxy]
  def method_missing(sym, *args)
    OandaAPI::Client::NamespaceProxy.new self, sym, args.first
  end
end

#domainSymbol


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/oanda_api/streaming/client.rb', line 51

class Client
  attr_reader :auth_token, :streaming_request
  attr_accessor :domain, :headers

  # @param [Symbol] domain see {#domain}
  # @param [String] auth_token see {#auth_token}
  def initialize(domain, auth_token)
    super()
    @auth_token = auth_token
    self.domain = domain
    @headers = auth
    @streaming_request = nil
  end

  # Returns an absolute URI for a resource request.
  #
  # @param [String] path the path portion of the URI.
  #
  # @return [String] a URI.
  def api_uri(path)
    uri = "#{BASE_URI[domain]}#{path}"
    uri.sub "[API_VERSION]", OandaAPI.configuration.rest_api_version
  end

  # Parameters used for authentication.
  # @return [Hash]
  def auth
    { "Authorization" => "Bearer #{auth_token}" }
  end

  # @private
  # Sets the domain the client can access.
  # @return [void]
  def domain=(value)
    fail ArgumentError, "Invalid domain" unless OandaAPI::DOMAINS.include? value
    @domain = value
  end

  # Determines whether emitted resources should include heartbeats.
  # @param [boolean] value
  def emit_heartbeats=(value)
    @emit_heartbeats = value
    streaming_request.emit_heartbeats = value if streaming_request
  end

  # Returns `true` if emitted resources include heartbeats. Defaults to false.
  # @return [boolean]
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    streaming_request.stop! if running?
  end

  # Returns `true` if the client is currently streaming.
  # @return [boolean]
  def running?
    !!(streaming_request && streaming_request.running?)
  end

  # @private
  # Executes a streaming http request.
  #
  # @param [Symbol] _method Ignored.
  #
  # @param [String] path the path of an Oanda resource request.
  #
  # @param [Hash] conditions optional parameters that are converted into query parameters.
  #
  # @yield [OandaAPI:ResourceBase, OandaAPI::Streaming::Client]  See {OandaAPI::Streaming::Request.stream}
  #
  # @return [void]
  #
  # @raise [OandaAPI::RequestError] if the API return code is not 2xx.
  # @raise [OandaAPI::StreamingDisconnect] if the API disconnects.
  def execute_request(_method, path, conditions = {}, &block)
    Http::Exceptions.wrap_and_check do
      @streaming_request = OandaAPI::Streaming::Request.new client: self,
                                                            uri: api_uri(path),
                                                            query: Utils.stringify_keys(conditions),
                                                            headers: OandaAPI.configuration.headers.merge(headers)
      @streaming_request.stream(&block)
      return nil
    end

    rescue Http::Exceptions::HttpException => e
      raise OandaAPI::RequestError, e.message
  end

  private

  # @private
  # Enables method-chaining.
  # @return [OandaAPI::Client::NamespaceProxy]
  def method_missing(sym, *args)
    OandaAPI::Client::NamespaceProxy.new self, sym, args.first
  end
end

#headersHash


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/oanda_api/streaming/client.rb', line 51

class Client
  attr_reader :auth_token, :streaming_request
  attr_accessor :domain, :headers

  # @param [Symbol] domain see {#domain}
  # @param [String] auth_token see {#auth_token}
  def initialize(domain, auth_token)
    super()
    @auth_token = auth_token
    self.domain = domain
    @headers = auth
    @streaming_request = nil
  end

  # Returns an absolute URI for a resource request.
  #
  # @param [String] path the path portion of the URI.
  #
  # @return [String] a URI.
  def api_uri(path)
    uri = "#{BASE_URI[domain]}#{path}"
    uri.sub "[API_VERSION]", OandaAPI.configuration.rest_api_version
  end

  # Parameters used for authentication.
  # @return [Hash]
  def auth
    { "Authorization" => "Bearer #{auth_token}" }
  end

  # @private
  # Sets the domain the client can access.
  # @return [void]
  def domain=(value)
    fail ArgumentError, "Invalid domain" unless OandaAPI::DOMAINS.include? value
    @domain = value
  end

  # Determines whether emitted resources should include heartbeats.
  # @param [boolean] value
  def emit_heartbeats=(value)
    @emit_heartbeats = value
    streaming_request.emit_heartbeats = value if streaming_request
  end

  # Returns `true` if emitted resources include heartbeats. Defaults to false.
  # @return [boolean]
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    streaming_request.stop! if running?
  end

  # Returns `true` if the client is currently streaming.
  # @return [boolean]
  def running?
    !!(streaming_request && streaming_request.running?)
  end

  # @private
  # Executes a streaming http request.
  #
  # @param [Symbol] _method Ignored.
  #
  # @param [String] path the path of an Oanda resource request.
  #
  # @param [Hash] conditions optional parameters that are converted into query parameters.
  #
  # @yield [OandaAPI:ResourceBase, OandaAPI::Streaming::Client]  See {OandaAPI::Streaming::Request.stream}
  #
  # @return [void]
  #
  # @raise [OandaAPI::RequestError] if the API return code is not 2xx.
  # @raise [OandaAPI::StreamingDisconnect] if the API disconnects.
  def execute_request(_method, path, conditions = {}, &block)
    Http::Exceptions.wrap_and_check do
      @streaming_request = OandaAPI::Streaming::Request.new client: self,
                                                            uri: api_uri(path),
                                                            query: Utils.stringify_keys(conditions),
                                                            headers: OandaAPI.configuration.headers.merge(headers)
      @streaming_request.stream(&block)
      return nil
    end

    rescue Http::Exceptions::HttpException => e
      raise OandaAPI::RequestError, e.message
  end

  private

  # @private
  # Enables method-chaining.
  # @return [OandaAPI::Client::NamespaceProxy]
  def method_missing(sym, *args)
    OandaAPI::Client::NamespaceProxy.new self, sym, args.first
  end
end

#streaming_requestOandaAPI::Streaming::Request (readonly)


51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/oanda_api/streaming/client.rb', line 51

class Client
  attr_reader :auth_token, :streaming_request
  attr_accessor :domain, :headers

  # @param [Symbol] domain see {#domain}
  # @param [String] auth_token see {#auth_token}
  def initialize(domain, auth_token)
    super()
    @auth_token = auth_token
    self.domain = domain
    @headers = auth
    @streaming_request = nil
  end

  # Returns an absolute URI for a resource request.
  #
  # @param [String] path the path portion of the URI.
  #
  # @return [String] a URI.
  def api_uri(path)
    uri = "#{BASE_URI[domain]}#{path}"
    uri.sub "[API_VERSION]", OandaAPI.configuration.rest_api_version
  end

  # Parameters used for authentication.
  # @return [Hash]
  def auth
    { "Authorization" => "Bearer #{auth_token}" }
  end

  # @private
  # Sets the domain the client can access.
  # @return [void]
  def domain=(value)
    fail ArgumentError, "Invalid domain" unless OandaAPI::DOMAINS.include? value
    @domain = value
  end

  # Determines whether emitted resources should include heartbeats.
  # @param [boolean] value
  def emit_heartbeats=(value)
    @emit_heartbeats = value
    streaming_request.emit_heartbeats = value if streaming_request
  end

  # Returns `true` if emitted resources include heartbeats. Defaults to false.
  # @return [boolean]
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    streaming_request.stop! if running?
  end

  # Returns `true` if the client is currently streaming.
  # @return [boolean]
  def running?
    !!(streaming_request && streaming_request.running?)
  end

  # @private
  # Executes a streaming http request.
  #
  # @param [Symbol] _method Ignored.
  #
  # @param [String] path the path of an Oanda resource request.
  #
  # @param [Hash] conditions optional parameters that are converted into query parameters.
  #
  # @yield [OandaAPI:ResourceBase, OandaAPI::Streaming::Client]  See {OandaAPI::Streaming::Request.stream}
  #
  # @return [void]
  #
  # @raise [OandaAPI::RequestError] if the API return code is not 2xx.
  # @raise [OandaAPI::StreamingDisconnect] if the API disconnects.
  def execute_request(_method, path, conditions = {}, &block)
    Http::Exceptions.wrap_and_check do
      @streaming_request = OandaAPI::Streaming::Request.new client: self,
                                                            uri: api_uri(path),
                                                            query: Utils.stringify_keys(conditions),
                                                            headers: OandaAPI.configuration.headers.merge(headers)
      @streaming_request.stream(&block)
      return nil
    end

    rescue Http::Exceptions::HttpException => e
      raise OandaAPI::RequestError, e.message
  end

  private

  # @private
  # Enables method-chaining.
  # @return [OandaAPI::Client::NamespaceProxy]
  def method_missing(sym, *args)
    OandaAPI::Client::NamespaceProxy.new self, sym, args.first
  end
end

Instance Method Details

#api_uri(path) ⇒ String

Returns an absolute URI for a resource request.


70
71
72
73
# File 'lib/oanda_api/streaming/client.rb', line 70

def api_uri(path)
  uri = "#{BASE_URI[domain]}#{path}"
  uri.sub "[API_VERSION]", OandaAPI.configuration.rest_api_version
end

#authHash

Parameters used for authentication.


77
78
79
# File 'lib/oanda_api/streaming/client.rb', line 77

def auth
  { "Authorization" => "Bearer #{auth_token}" }
end

#emit_heartbeats=(value) ⇒ Object

Determines whether emitted resources should include heartbeats.


91
92
93
94
# File 'lib/oanda_api/streaming/client.rb', line 91

def emit_heartbeats=(value)
  @emit_heartbeats = value
  streaming_request.emit_heartbeats = value if streaming_request
end

#emit_heartbeats?boolean

Returns true if emitted resources include heartbeats. Defaults to false.


98
99
100
# File 'lib/oanda_api/streaming/client.rb', line 98

def emit_heartbeats?
  !!@emit_heartbeats
end

#execute_request(_method, path, conditions = {}) {|OandaAPI:ResourceBase, OandaAPI::Streaming::Client| ... } ⇒ void

This method returns an undefined value.

Executes a streaming http request.

Yields:

Raises:


129
130
131
132
133
134
135
136
137
138
139
140
141
# File 'lib/oanda_api/streaming/client.rb', line 129

def execute_request(_method, path, conditions = {}, &block)
  Http::Exceptions.wrap_and_check do
    @streaming_request = OandaAPI::Streaming::Request.new client: self,
                                                          uri: api_uri(path),
                                                          query: Utils.stringify_keys(conditions),
                                                          headers: OandaAPI.configuration.headers.merge(headers)
    @streaming_request.stream(&block)
    return nil
  end

  rescue Http::Exceptions::HttpException => e
    raise OandaAPI::RequestError, e.message
end

#running?boolean

Returns true if the client is currently streaming.


110
111
112
# File 'lib/oanda_api/streaming/client.rb', line 110

def running?
  !!(streaming_request && streaming_request.running?)
end

#stop!void

This method returns an undefined value.

Signals the streaming request to disconnect and terminates streaming.


104
105
106
# File 'lib/oanda_api/streaming/client.rb', line 104

def stop!
  streaming_request.stop! if running?
end