Class: OandaAPI::Streaming::Request

Inherits:
Object
  • Object
show all
Defined in:
lib/oanda_api/streaming/request.rb

Overview

An HTTP 1.1 streaming request. Used to create a persistent connection with the server and continuously download a stream of resource representations. Resources are emitted as ResourceBase instances.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client: nil, uri: nil, query: {}, headers: {}) ⇒ Request

Creates a Streaming::Request instance.

Parameters:

  • client (Streaming::Client) (defaults to: nil)

    a streaming client instance which can be used to send signals to an instance of this Streaming::Request class.

  • uri (String) (defaults to: nil)

    an absolute URI to the service endpoint.

  • query (Hash) (defaults to: {})

    a list of query parameters, unencoded. The list is converted into a query string. See OandaAPI::Client#query_string_normalizer.

  • headers (Hash) (defaults to: {})

    a list of header values that will be sent with the request.



30
31
32
33
34
35
36
37
38
39
# File 'lib/oanda_api/streaming/request.rb', line 30

def initialize(client: nil, uri: nil, query: {}, headers: {})
  self.client = client.nil? ? self : client
  @uri = URI uri
  @uri.query = OandaAPI::Client.default_options[:query_string_normalizer].call(query)
  @http = Net::HTTP.new @uri.host, 443
  @http.use_ssl = true
  @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
  @request = Net::HTTP::Get.new @uri
  headers.each_pair { |pair| @request.add_field(*pair) }
end

Instance Attribute Details

#clientOandaAPI::Streaming::Client

Returns a streaming client instance.

Returns:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/oanda_api/streaming/request.rb', line 19

class Request
  attr_accessor :client, :emit_heartbeats
  attr_reader :uri, :request

  # Creates a `Streaming::Request` instance.
  # @param [Streaming::Client] client a streaming client instance which can be used to
  #   send signals to an instance of this `Streaming::Request` class.
  # @param [String] uri an absolute URI to the service endpoint.
  # @param [Hash] query a list of query parameters, unencoded. The list
  #   is converted into a query string. See `OandaAPI::Client#query_string_normalizer`.
  # @param [Hash] headers a list of header values that will be sent with the request.
  def initialize(client: nil, uri: nil, query: {}, headers: {})
    self.client = client.nil? ? self : client
    @uri = URI uri
    @uri.query = OandaAPI::Client.default_options[:query_string_normalizer].call(query)
    @http = Net::HTTP.new @uri.host, 443
    @http.use_ssl = true
    @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    @request = Net::HTTP::Get.new @uri
    headers.each_pair { |pair| @request.add_field(*pair) }
  end

  # Sets the client attribute
  # @param [OandaAPI::Streaming::Client] value
  # @return [void]
  # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance.
  def client=(value)
    fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request)
    @client = value
  end

  # @return [boolean] true if heatbeats are emitted.
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    @stop_requested = true
  end

  # Returns `true` if the request has been signalled to terminate. See {#stop!}.
  # @return [boolean]
  def stop_requested?
    !!@stop_requested
  end

  # @return [boolean] true if the instance is connected and is streaming a response.
  def running?
    !!@running
  end

  # Emits a stream of {OandaAPI::ResourceBase} instances, depending
  #  on the endpoint that the request is servicing, either
  #  {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction}
  #  instances are emitted. When {#emit_heartbeats?} is `true`, then
  #  {OandaAPI::Resource::Heartbeat} could also be emitted.
  #
  #  Note this method runs as an infinite loop and will block indefinitely
  #  until either the connection is halted or a {#stop!} signal is recieved.
  #
  # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response
  #   stream is yielded as they are received. The client instance controlling the
  #   streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  # @return [void]
  def stream(&block)
    @stop_requested = false
    @running = true
    # @http.set_debug_output $stderr
    @http.request(@request) do |response|
      buffer = ""
      response.read_body do |chunk|
        buffer << chunk
        next unless chunk.match(/\r\n\Z/)
        buffer.gsub!(/\r\n/,"")
        handle_response(buffer).each do |resource|
          block.call(resource, @client)
          return if stop_requested?
        end
        buffer = ""
        return if stop_requested?
        sleep 0.01
      end
    end
  ensure
    @running = false
    @http.finish if @http.started?
  end

  private

  # @private
  # Converts a raw json response into {OandaAPI::ResourceBase} instances.
  # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint
  #   that the request is servicing, which is either an array of
  #   {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances.
  #   When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  def handle_response(string)
    parse(string).map do |parsed_response|
      case
      when parsed_response[:heartbeat]
        OandaAPI::Resource::Heartbeat.new parsed_response[:heartbeat] if emit_heartbeats?
      when parsed_response[:tick]
        OandaAPI::Resource::Price.new parsed_response[:tick]
      when parsed_response[:transaction]
        OandaAPI::Resource::Transaction.new parsed_response[:transaction]
      when parsed_response[:disconnect]
        fail OandaAPI::StreamingDisconnect, parsed_response[:disconnect][:message]
      else
        fail OandaAPI::RequestError, "unknown resource: #{parsed_response}"
      end
    end.compact
  end

  # @private
  # Uses the best json parser available for optimal performance and stream parsing ability.
  def parse(string)
    OandaAPI::Streaming::JsonParser.adapter.parse string
  end
end

#emit_heartbeatsboolean

Returns true if heartbeats are emitted.

Returns:

  • (boolean)

    true if heartbeats are emitted.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/oanda_api/streaming/request.rb', line 19

class Request
  attr_accessor :client, :emit_heartbeats
  attr_reader :uri, :request

  # Creates a `Streaming::Request` instance.
  # @param [Streaming::Client] client a streaming client instance which can be used to
  #   send signals to an instance of this `Streaming::Request` class.
  # @param [String] uri an absolute URI to the service endpoint.
  # @param [Hash] query a list of query parameters, unencoded. The list
  #   is converted into a query string. See `OandaAPI::Client#query_string_normalizer`.
  # @param [Hash] headers a list of header values that will be sent with the request.
  def initialize(client: nil, uri: nil, query: {}, headers: {})
    self.client = client.nil? ? self : client
    @uri = URI uri
    @uri.query = OandaAPI::Client.default_options[:query_string_normalizer].call(query)
    @http = Net::HTTP.new @uri.host, 443
    @http.use_ssl = true
    @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    @request = Net::HTTP::Get.new @uri
    headers.each_pair { |pair| @request.add_field(*pair) }
  end

  # Sets the client attribute
  # @param [OandaAPI::Streaming::Client] value
  # @return [void]
  # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance.
  def client=(value)
    fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request)
    @client = value
  end

  # @return [boolean] true if heatbeats are emitted.
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    @stop_requested = true
  end

  # Returns `true` if the request has been signalled to terminate. See {#stop!}.
  # @return [boolean]
  def stop_requested?
    !!@stop_requested
  end

  # @return [boolean] true if the instance is connected and is streaming a response.
  def running?
    !!@running
  end

  # Emits a stream of {OandaAPI::ResourceBase} instances, depending
  #  on the endpoint that the request is servicing, either
  #  {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction}
  #  instances are emitted. When {#emit_heartbeats?} is `true`, then
  #  {OandaAPI::Resource::Heartbeat} could also be emitted.
  #
  #  Note this method runs as an infinite loop and will block indefinitely
  #  until either the connection is halted or a {#stop!} signal is recieved.
  #
  # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response
  #   stream is yielded as they are received. The client instance controlling the
  #   streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  # @return [void]
  def stream(&block)
    @stop_requested = false
    @running = true
    # @http.set_debug_output $stderr
    @http.request(@request) do |response|
      buffer = ""
      response.read_body do |chunk|
        buffer << chunk
        next unless chunk.match(/\r\n\Z/)
        buffer.gsub!(/\r\n/,"")
        handle_response(buffer).each do |resource|
          block.call(resource, @client)
          return if stop_requested?
        end
        buffer = ""
        return if stop_requested?
        sleep 0.01
      end
    end
  ensure
    @running = false
    @http.finish if @http.started?
  end

  private

  # @private
  # Converts a raw json response into {OandaAPI::ResourceBase} instances.
  # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint
  #   that the request is servicing, which is either an array of
  #   {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances.
  #   When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  def handle_response(string)
    parse(string).map do |parsed_response|
      case
      when parsed_response[:heartbeat]
        OandaAPI::Resource::Heartbeat.new parsed_response[:heartbeat] if emit_heartbeats?
      when parsed_response[:tick]
        OandaAPI::Resource::Price.new parsed_response[:tick]
      when parsed_response[:transaction]
        OandaAPI::Resource::Transaction.new parsed_response[:transaction]
      when parsed_response[:disconnect]
        fail OandaAPI::StreamingDisconnect, parsed_response[:disconnect][:message]
      else
        fail OandaAPI::RequestError, "unknown resource: #{parsed_response}"
      end
    end.compact
  end

  # @private
  # Uses the best json parser available for optimal performance and stream parsing ability.
  def parse(string)
    OandaAPI::Streaming::JsonParser.adapter.parse string
  end
end

#requestURI::HTTPS (readonly)

Returns a URI instance.

Returns:

  • (URI::HTTPS)

    a URI instance.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/oanda_api/streaming/request.rb', line 19

class Request
  attr_accessor :client, :emit_heartbeats
  attr_reader :uri, :request

  # Creates a `Streaming::Request` instance.
  # @param [Streaming::Client] client a streaming client instance which can be used to
  #   send signals to an instance of this `Streaming::Request` class.
  # @param [String] uri an absolute URI to the service endpoint.
  # @param [Hash] query a list of query parameters, unencoded. The list
  #   is converted into a query string. See `OandaAPI::Client#query_string_normalizer`.
  # @param [Hash] headers a list of header values that will be sent with the request.
  def initialize(client: nil, uri: nil, query: {}, headers: {})
    self.client = client.nil? ? self : client
    @uri = URI uri
    @uri.query = OandaAPI::Client.default_options[:query_string_normalizer].call(query)
    @http = Net::HTTP.new @uri.host, 443
    @http.use_ssl = true
    @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    @request = Net::HTTP::Get.new @uri
    headers.each_pair { |pair| @request.add_field(*pair) }
  end

  # Sets the client attribute
  # @param [OandaAPI::Streaming::Client] value
  # @return [void]
  # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance.
  def client=(value)
    fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request)
    @client = value
  end

  # @return [boolean] true if heatbeats are emitted.
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    @stop_requested = true
  end

  # Returns `true` if the request has been signalled to terminate. See {#stop!}.
  # @return [boolean]
  def stop_requested?
    !!@stop_requested
  end

  # @return [boolean] true if the instance is connected and is streaming a response.
  def running?
    !!@running
  end

  # Emits a stream of {OandaAPI::ResourceBase} instances, depending
  #  on the endpoint that the request is servicing, either
  #  {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction}
  #  instances are emitted. When {#emit_heartbeats?} is `true`, then
  #  {OandaAPI::Resource::Heartbeat} could also be emitted.
  #
  #  Note this method runs as an infinite loop and will block indefinitely
  #  until either the connection is halted or a {#stop!} signal is recieved.
  #
  # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response
  #   stream is yielded as they are received. The client instance controlling the
  #   streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  # @return [void]
  def stream(&block)
    @stop_requested = false
    @running = true
    # @http.set_debug_output $stderr
    @http.request(@request) do |response|
      buffer = ""
      response.read_body do |chunk|
        buffer << chunk
        next unless chunk.match(/\r\n\Z/)
        buffer.gsub!(/\r\n/,"")
        handle_response(buffer).each do |resource|
          block.call(resource, @client)
          return if stop_requested?
        end
        buffer = ""
        return if stop_requested?
        sleep 0.01
      end
    end
  ensure
    @running = false
    @http.finish if @http.started?
  end

  private

  # @private
  # Converts a raw json response into {OandaAPI::ResourceBase} instances.
  # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint
  #   that the request is servicing, which is either an array of
  #   {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances.
  #   When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  def handle_response(string)
    parse(string).map do |parsed_response|
      case
      when parsed_response[:heartbeat]
        OandaAPI::Resource::Heartbeat.new parsed_response[:heartbeat] if emit_heartbeats?
      when parsed_response[:tick]
        OandaAPI::Resource::Price.new parsed_response[:tick]
      when parsed_response[:transaction]
        OandaAPI::Resource::Transaction.new parsed_response[:transaction]
      when parsed_response[:disconnect]
        fail OandaAPI::StreamingDisconnect, parsed_response[:disconnect][:message]
      else
        fail OandaAPI::RequestError, "unknown resource: #{parsed_response}"
      end
    end.compact
  end

  # @private
  # Uses the best json parser available for optimal performance and stream parsing ability.
  def parse(string)
    OandaAPI::Streaming::JsonParser.adapter.parse string
  end
end

#uriURI::HTTPS (readonly)

Returns a URI instance.

Returns:

  • (URI::HTTPS)

    a URI instance.



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# File 'lib/oanda_api/streaming/request.rb', line 19

class Request
  attr_accessor :client, :emit_heartbeats
  attr_reader :uri, :request

  # Creates a `Streaming::Request` instance.
  # @param [Streaming::Client] client a streaming client instance which can be used to
  #   send signals to an instance of this `Streaming::Request` class.
  # @param [String] uri an absolute URI to the service endpoint.
  # @param [Hash] query a list of query parameters, unencoded. The list
  #   is converted into a query string. See `OandaAPI::Client#query_string_normalizer`.
  # @param [Hash] headers a list of header values that will be sent with the request.
  def initialize(client: nil, uri: nil, query: {}, headers: {})
    self.client = client.nil? ? self : client
    @uri = URI uri
    @uri.query = OandaAPI::Client.default_options[:query_string_normalizer].call(query)
    @http = Net::HTTP.new @uri.host, 443
    @http.use_ssl = true
    @http.verify_mode = OpenSSL::SSL::VERIFY_PEER
    @request = Net::HTTP::Get.new @uri
    headers.each_pair { |pair| @request.add_field(*pair) }
  end

  # Sets the client attribute
  # @param [OandaAPI::Streaming::Client] value
  # @return [void]
  # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance.
  def client=(value)
    fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request)
    @client = value
  end

  # @return [boolean] true if heatbeats are emitted.
  def emit_heartbeats?
    !!@emit_heartbeats
  end

  # Signals the streaming request to disconnect and terminates streaming.
  # @return [void]
  def stop!
    @stop_requested = true
  end

  # Returns `true` if the request has been signalled to terminate. See {#stop!}.
  # @return [boolean]
  def stop_requested?
    !!@stop_requested
  end

  # @return [boolean] true if the instance is connected and is streaming a response.
  def running?
    !!@running
  end

  # Emits a stream of {OandaAPI::ResourceBase} instances, depending
  #  on the endpoint that the request is servicing, either
  #  {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction}
  #  instances are emitted. When {#emit_heartbeats?} is `true`, then
  #  {OandaAPI::Resource::Heartbeat} could also be emitted.
  #
  #  Note this method runs as an infinite loop and will block indefinitely
  #  until either the connection is halted or a {#stop!} signal is recieved.
  #
  # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response
  #   stream is yielded as they are received. The client instance controlling the
  #   streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  # @return [void]
  def stream(&block)
    @stop_requested = false
    @running = true
    # @http.set_debug_output $stderr
    @http.request(@request) do |response|
      buffer = ""
      response.read_body do |chunk|
        buffer << chunk
        next unless chunk.match(/\r\n\Z/)
        buffer.gsub!(/\r\n/,"")
        handle_response(buffer).each do |resource|
          block.call(resource, @client)
          return if stop_requested?
        end
        buffer = ""
        return if stop_requested?
        sleep 0.01
      end
    end
  ensure
    @running = false
    @http.finish if @http.started?
  end

  private

  # @private
  # Converts a raw json response into {OandaAPI::ResourceBase} instances.
  # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint
  #   that the request is servicing, which is either an array of
  #   {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances.
  #   When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}.
  # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server.
  # @raise [OandaAPI::RequestError] if an unexpected resource is returned.
  def handle_response(string)
    parse(string).map do |parsed_response|
      case
      when parsed_response[:heartbeat]
        OandaAPI::Resource::Heartbeat.new parsed_response[:heartbeat] if emit_heartbeats?
      when parsed_response[:tick]
        OandaAPI::Resource::Price.new parsed_response[:tick]
      when parsed_response[:transaction]
        OandaAPI::Resource::Transaction.new parsed_response[:transaction]
      when parsed_response[:disconnect]
        fail OandaAPI::StreamingDisconnect, parsed_response[:disconnect][:message]
      else
        fail OandaAPI::RequestError, "unknown resource: #{parsed_response}"
      end
    end.compact
  end

  # @private
  # Uses the best json parser available for optimal performance and stream parsing ability.
  def parse(string)
    OandaAPI::Streaming::JsonParser.adapter.parse string
  end
end

Instance Method Details

#emit_heartbeats?boolean

Returns true if heatbeats are emitted.

Returns:

  • (boolean)

    true if heatbeats are emitted.



51
52
53
# File 'lib/oanda_api/streaming/request.rb', line 51

def emit_heartbeats?
  !!@emit_heartbeats
end

#running?boolean

Returns true if the instance is connected and is streaming a response.

Returns:

  • (boolean)

    true if the instance is connected and is streaming a response.



68
69
70
# File 'lib/oanda_api/streaming/request.rb', line 68

def running?
  !!@running
end

#stop!void

This method returns an undefined value.

Signals the streaming request to disconnect and terminates streaming.



57
58
59
# File 'lib/oanda_api/streaming/request.rb', line 57

def stop!
  @stop_requested = true
end

#stop_requested?boolean

Returns true if the request has been signalled to terminate. See #stop!.

Returns:

  • (boolean)


63
64
65
# File 'lib/oanda_api/streaming/request.rb', line 63

def stop_requested?
  !!@stop_requested
end

#stream {|OandaAPI::ResourceBase, OandaAPI::Streaming::Client| ... } ⇒ void

This method returns an undefined value.

Emits a stream of ResourceBase instances, depending on the endpoint that the request is servicing, either Resource::Price or Resource::Transaction instances are emitted. When #emit_heartbeats? is true, then Resource::Heartbeat could also be emitted.

Note this method runs as an infinite loop and will block indefinitely until either the connection is halted or a #stop! signal is recieved.

Yields:

  • (OandaAPI::ResourceBase, OandaAPI::Streaming::Client)

    Each resource found in the response stream is yielded as they are received. The client instance controlling the streaming request is also yielded. It can be used to issue a client.stop! to terminate the resquest.

Raises:



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/oanda_api/streaming/request.rb', line 87

def stream(&block)
  @stop_requested = false
  @running = true
  # @http.set_debug_output $stderr
  @http.request(@request) do |response|
    buffer = ""
    response.read_body do |chunk|
      buffer << chunk
      next unless chunk.match(/\r\n\Z/)
      buffer.gsub!(/\r\n/,"")
      handle_response(buffer).each do |resource|
        block.call(resource, @client)
        return if stop_requested?
      end
      buffer = ""
      return if stop_requested?
      sleep 0.01
    end
  end
ensure
  @running = false
  @http.finish if @http.started?
end