Class: OandaAPI::Streaming::Request

Inherits:
Object
  • Object
show all
Defined in:
lib/oanda_api/streaming/request.rb

Overview

An HTTP 1.1 streaming request. Used to create a persistent connection with the server and continuously download a stream of resource representations. Resources are emitted as ResourceBase instances.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client: nil, uri:, query: {}, headers: {}) ⇒ Request

Creates an Streaming::Request instance.


30
31
32
33
34
35
36
37
38
39
# File 'lib/oanda_api/streaming/request.rb', line 30

def initialize(client: nil, uri:, query: {}, headers: {})
  self.client = client.nil? ? self : client
  @uri = URI uri
  @uri.query = OandaAPI::Client.default_options[:query_string_normalizer].call(query)
  @http = Net::HTTP.new @uri.host, 443
  @http.use_ssl = true
  @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
  @request = Net::HTTP::Get.new @uri
  headers.each_pair { |pair| @request.add_field(*pair) }
end

Instance Attribute Details

#clientOandaAPI::Streaming::Client


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/oanda_api/streaming/request.rb', line 19

class Request
  attr_accessor :client, :emit_heartbeats
  attr_reader :uri, :request

  # Creates an `Streaming::Request` instance.
  # @param [Streaming::Client] client a streaming client instance which can be used to
  #   send signals to an instance of this `Streaming::Request` class.
  # @param [String] uri an absolute URI to the service endpoint.
  # @param [Hash] query a list of query parameters, unencoded. The list
  #   is converted into a query string. See `OandaAPI::Client#query_string_normalizer`.
  # @param [Hash] headers a list of header values that will be sent with the request.
  def initialize(client: nil, uri:, query: {}, headers: {})
    self.client = client.nil? ? self : client
    @uri = URI uri
    @uri.query = OandaAPI::Client.default_options[:query_string_normalizer].call(query)
    @http = Net::HTTP.new @uri.host, 443
    @http.use_ssl = true
    @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    @request = Net::HTTP::Get.new @uri
    headers.each_pair { |pair| @request.add_field(*pair) }
  end

  # Sets the client attribute
  # @param [OandaAPI::Streaming::Client] value
  # @return [void]
  # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance.
  def client=(value)
    fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless (value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request))
    @client = value
  end

  # @return [boolean] true if heatbeats are emitted.
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    @stop_requested = true
  end

  # Returns `true` if the request has been signalled to terminate. See {#stop!}.
  # @return [boolean]
  def stop_requested?
    !!@stop_requested
  end

  # @return [true] if the instance is connected and streaming a response.
  def running?
    !!@running
  end

  # Emits a stream of {OandaAPI::ResourceBase} instances, depending
  #  on the endpoint that the request is servicing, either
  #  {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction}
  #  instances are emitted. When {#emit_heartbeats?} is `true`, then
  #  resources could also be {OandaAPI::Resource::Heartbeat}.
  #
  #  Note this method runs as an infinite loop and will block indefinitely
  #  until either the connection is halted or a {#stop!} signal is recieved.
  #
  # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response
  #   stream is yielded as they are received. The client instance controlling the
  #   streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  # @return [void]
  def stream(&block)
    @stop_requested = false
    @running = true
    # @http.set_debug_output $stderr
    @http.request(@request) do |response|
      response.read_body do |chunk|
        handle_response(chunk).each do |resource|
          block.call(resource, @client)
          return if stop_requested?
        end
        return if stop_requested?
        sleep 0.01
      end
    end
  ensure
    @running = false
    @http.finish if @http.started?
  end

  private

  # @private
  # Converts a raw json response into {OandaAPI::ResourceBase} instances.
  # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint
  #   that the request is servicing, which is either an array of
  #   {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances.
  #   When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  def handle_response(response)
    response.split("\r\n").map do |json|
      parsed_response = JSON.parse json
      case
      when parsed_response["heartbeat"]
        OandaAPI::Resource::Heartbeat.new parsed_response["heartbeat"] if emit_heartbeats?
      when parsed_response["tick"]
        OandaAPI::Resource::Price.new parsed_response["tick"]
      when parsed_response["transaction"]
        OandaAPI::Resource::Transaction.new parsed_response["transaction"]
      when parsed_response["disconnect"]
        raise OandaAPI::StreamingDisconnect, parsed_response["disconnect"]["message"]
      else
        raise OandaAPI::RequestError, "unknown resource: #{json}"
      end
    end.compact
  end
end

#emit_heartbeatsboolean


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/oanda_api/streaming/request.rb', line 19

class Request
  attr_accessor :client, :emit_heartbeats
  attr_reader :uri, :request

  # Creates an `Streaming::Request` instance.
  # @param [Streaming::Client] client a streaming client instance which can be used to
  #   send signals to an instance of this `Streaming::Request` class.
  # @param [String] uri an absolute URI to the service endpoint.
  # @param [Hash] query a list of query parameters, unencoded. The list
  #   is converted into a query string. See `OandaAPI::Client#query_string_normalizer`.
  # @param [Hash] headers a list of header values that will be sent with the request.
  def initialize(client: nil, uri:, query: {}, headers: {})
    self.client = client.nil? ? self : client
    @uri = URI uri
    @uri.query = OandaAPI::Client.default_options[:query_string_normalizer].call(query)
    @http = Net::HTTP.new @uri.host, 443
    @http.use_ssl = true
    @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    @request = Net::HTTP::Get.new @uri
    headers.each_pair { |pair| @request.add_field(*pair) }
  end

  # Sets the client attribute
  # @param [OandaAPI::Streaming::Client] value
  # @return [void]
  # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance.
  def client=(value)
    fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless (value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request))
    @client = value
  end

  # @return [boolean] true if heatbeats are emitted.
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    @stop_requested = true
  end

  # Returns `true` if the request has been signalled to terminate. See {#stop!}.
  # @return [boolean]
  def stop_requested?
    !!@stop_requested
  end

  # @return [true] if the instance is connected and streaming a response.
  def running?
    !!@running
  end

  # Emits a stream of {OandaAPI::ResourceBase} instances, depending
  #  on the endpoint that the request is servicing, either
  #  {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction}
  #  instances are emitted. When {#emit_heartbeats?} is `true`, then
  #  resources could also be {OandaAPI::Resource::Heartbeat}.
  #
  #  Note this method runs as an infinite loop and will block indefinitely
  #  until either the connection is halted or a {#stop!} signal is recieved.
  #
  # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response
  #   stream is yielded as they are received. The client instance controlling the
  #   streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  # @return [void]
  def stream(&block)
    @stop_requested = false
    @running = true
    # @http.set_debug_output $stderr
    @http.request(@request) do |response|
      response.read_body do |chunk|
        handle_response(chunk).each do |resource|
          block.call(resource, @client)
          return if stop_requested?
        end
        return if stop_requested?
        sleep 0.01
      end
    end
  ensure
    @running = false
    @http.finish if @http.started?
  end

  private

  # @private
  # Converts a raw json response into {OandaAPI::ResourceBase} instances.
  # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint
  #   that the request is servicing, which is either an array of
  #   {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances.
  #   When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  def handle_response(response)
    response.split("\r\n").map do |json|
      parsed_response = JSON.parse json
      case
      when parsed_response["heartbeat"]
        OandaAPI::Resource::Heartbeat.new parsed_response["heartbeat"] if emit_heartbeats?
      when parsed_response["tick"]
        OandaAPI::Resource::Price.new parsed_response["tick"]
      when parsed_response["transaction"]
        OandaAPI::Resource::Transaction.new parsed_response["transaction"]
      when parsed_response["disconnect"]
        raise OandaAPI::StreamingDisconnect, parsed_response["disconnect"]["message"]
      else
        raise OandaAPI::RequestError, "unknown resource: #{json}"
      end
    end.compact
  end
end

#requestURI::HTTPS (readonly)


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/oanda_api/streaming/request.rb', line 19

class Request
  attr_accessor :client, :emit_heartbeats
  attr_reader :uri, :request

  # Creates an `Streaming::Request` instance.
  # @param [Streaming::Client] client a streaming client instance which can be used to
  #   send signals to an instance of this `Streaming::Request` class.
  # @param [String] uri an absolute URI to the service endpoint.
  # @param [Hash] query a list of query parameters, unencoded. The list
  #   is converted into a query string. See `OandaAPI::Client#query_string_normalizer`.
  # @param [Hash] headers a list of header values that will be sent with the request.
  def initialize(client: nil, uri:, query: {}, headers: {})
    self.client = client.nil? ? self : client
    @uri = URI uri
    @uri.query = OandaAPI::Client.default_options[:query_string_normalizer].call(query)
    @http = Net::HTTP.new @uri.host, 443
    @http.use_ssl = true
    @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    @request = Net::HTTP::Get.new @uri
    headers.each_pair { |pair| @request.add_field(*pair) }
  end

  # Sets the client attribute
  # @param [OandaAPI::Streaming::Client] value
  # @return [void]
  # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance.
  def client=(value)
    fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless (value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request))
    @client = value
  end

  # @return [boolean] true if heatbeats are emitted.
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    @stop_requested = true
  end

  # Returns `true` if the request has been signalled to terminate. See {#stop!}.
  # @return [boolean]
  def stop_requested?
    !!@stop_requested
  end

  # @return [true] if the instance is connected and streaming a response.
  def running?
    !!@running
  end

  # Emits a stream of {OandaAPI::ResourceBase} instances, depending
  #  on the endpoint that the request is servicing, either
  #  {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction}
  #  instances are emitted. When {#emit_heartbeats?} is `true`, then
  #  resources could also be {OandaAPI::Resource::Heartbeat}.
  #
  #  Note this method runs as an infinite loop and will block indefinitely
  #  until either the connection is halted or a {#stop!} signal is recieved.
  #
  # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response
  #   stream is yielded as they are received. The client instance controlling the
  #   streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  # @return [void]
  def stream(&block)
    @stop_requested = false
    @running = true
    # @http.set_debug_output $stderr
    @http.request(@request) do |response|
      response.read_body do |chunk|
        handle_response(chunk).each do |resource|
          block.call(resource, @client)
          return if stop_requested?
        end
        return if stop_requested?
        sleep 0.01
      end
    end
  ensure
    @running = false
    @http.finish if @http.started?
  end

  private

  # @private
  # Converts a raw json response into {OandaAPI::ResourceBase} instances.
  # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint
  #   that the request is servicing, which is either an array of
  #   {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances.
  #   When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  def handle_response(response)
    response.split("\r\n").map do |json|
      parsed_response = JSON.parse json
      case
      when parsed_response["heartbeat"]
        OandaAPI::Resource::Heartbeat.new parsed_response["heartbeat"] if emit_heartbeats?
      when parsed_response["tick"]
        OandaAPI::Resource::Price.new parsed_response["tick"]
      when parsed_response["transaction"]
        OandaAPI::Resource::Transaction.new parsed_response["transaction"]
      when parsed_response["disconnect"]
        raise OandaAPI::StreamingDisconnect, parsed_response["disconnect"]["message"]
      else
        raise OandaAPI::RequestError, "unknown resource: #{json}"
      end
    end.compact
  end
end

#uriURI::HTTPS (readonly)


19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/oanda_api/streaming/request.rb', line 19

class Request
  attr_accessor :client, :emit_heartbeats
  attr_reader :uri, :request

  # Creates an `Streaming::Request` instance.
  # @param [Streaming::Client] client a streaming client instance which can be used to
  #   send signals to an instance of this `Streaming::Request` class.
  # @param [String] uri an absolute URI to the service endpoint.
  # @param [Hash] query a list of query parameters, unencoded. The list
  #   is converted into a query string. See `OandaAPI::Client#query_string_normalizer`.
  # @param [Hash] headers a list of header values that will be sent with the request.
  def initialize(client: nil, uri:, query: {}, headers: {})
    self.client = client.nil? ? self : client
    @uri = URI uri
    @uri.query = OandaAPI::Client.default_options[:query_string_normalizer].call(query)
    @http = Net::HTTP.new @uri.host, 443
    @http.use_ssl = true
    @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    @request = Net::HTTP::Get.new @uri
    headers.each_pair { |pair| @request.add_field(*pair) }
  end

  # Sets the client attribute
  # @param [OandaAPI::Streaming::Client] value
  # @return [void]
  # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance.
  def client=(value)
    fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless (value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request))
    @client = value
  end

  # @return [boolean] true if heatbeats are emitted.
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    @stop_requested = true
  end

  # Returns `true` if the request has been signalled to terminate. See {#stop!}.
  # @return [boolean]
  def stop_requested?
    !!@stop_requested
  end

  # @return [true] if the instance is connected and streaming a response.
  def running?
    !!@running
  end

  # Emits a stream of {OandaAPI::ResourceBase} instances, depending
  #  on the endpoint that the request is servicing, either
  #  {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction}
  #  instances are emitted. When {#emit_heartbeats?} is `true`, then
  #  resources could also be {OandaAPI::Resource::Heartbeat}.
  #
  #  Note this method runs as an infinite loop and will block indefinitely
  #  until either the connection is halted or a {#stop!} signal is recieved.
  #
  # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response
  #   stream is yielded as they are received. The client instance controlling the
  #   streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  # @return [void]
  def stream(&block)
    @stop_requested = false
    @running = true
    # @http.set_debug_output $stderr
    @http.request(@request) do |response|
      response.read_body do |chunk|
        handle_response(chunk).each do |resource|
          block.call(resource, @client)
          return if stop_requested?
        end
        return if stop_requested?
        sleep 0.01
      end
    end
  ensure
    @running = false
    @http.finish if @http.started?
  end

  private

  # @private
  # Converts a raw json response into {OandaAPI::ResourceBase} instances.
  # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint
  #   that the request is servicing, which is either an array of
  #   {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances.
  #   When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  def handle_response(response)
    response.split("\r\n").map do |json|
      parsed_response = JSON.parse json
      case
      when parsed_response["heartbeat"]
        OandaAPI::Resource::Heartbeat.new parsed_response["heartbeat"] if emit_heartbeats?
      when parsed_response["tick"]
        OandaAPI::Resource::Price.new parsed_response["tick"]
      when parsed_response["transaction"]
        OandaAPI::Resource::Transaction.new parsed_response["transaction"]
      when parsed_response["disconnect"]
        raise OandaAPI::StreamingDisconnect, parsed_response["disconnect"]["message"]
      else
        raise OandaAPI::RequestError, "unknown resource: #{json}"
      end
    end.compact
  end
end

Instance Method Details

#emit_heartbeats?boolean


51
52
53
# File 'lib/oanda_api/streaming/request.rb', line 51

def emit_heartbeats?
  !!@emit_heartbeats
end

#running?true


68
69
70
# File 'lib/oanda_api/streaming/request.rb', line 68

def running?
  !!@running
end

#stop!void

This method returns an undefined value.

Signals the streaming request to disconnect and terminates streaming.


57
58
59
# File 'lib/oanda_api/streaming/request.rb', line 57

def stop!
  @stop_requested = true
end

#stop_requested?boolean

Returns true if the request has been signalled to terminate. See #stop!.


63
64
65
# File 'lib/oanda_api/streaming/request.rb', line 63

def stop_requested?
  !!@stop_requested
end

#stream {|OandaAPI::ResourceBase, OandaAPI::Streaming::Client| ... } ⇒ void

This method returns an undefined value.

Emits a stream of ResourceBase instances, depending on the endpoint that the request is servicing, either Resource::Price or Resource::Transaction instances are emitted. When #emit_heartbeats? is true, then resources could also be Resource::Heartbeat.

Note this method runs as an infinite loop and will block indefinitely until either the connection is halted or a #stop! signal is recieved.

Yields:

  • (OandaAPI::ResourceBase, OandaAPI::Streaming::Client)

    Each resource found in the response stream is yielded as they are received. The client instance controlling the streaming request is also yielded. It can be used to issue a client.stop! to terminate the resquest.

Raises:


87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/oanda_api/streaming/request.rb', line 87

def stream(&block)
  @stop_requested = false
  @running = true
  # @http.set_debug_output $stderr
  @http.request(@request) do |response|
    response.read_body do |chunk|
      handle_response(chunk).each do |resource|
        block.call(resource, @client)
        return if stop_requested?
      end
      return if stop_requested?
      sleep 0.01
    end
  end
ensure
  @running = false
  @http.finish if @http.started?
end