Class: OandaAPI::Streaming::Request
- Inherits:
-
Object
- Object
- OandaAPI::Streaming::Request
- Defined in:
- lib/oanda_api/streaming/request.rb
Overview
An HTTP 1.1 streaming request. Used to create a persistent connection with the server and continuously download a stream of resource representations. Resources are emitted as ResourceBase instances.
Instance Attribute Summary collapse
-
#client ⇒ OandaAPI::Streaming::Client
A streaming client instance.
- #emit_heartbeats ⇒ boolean
-
#request ⇒ URI::HTTPS
readonly
A URI instance.
-
#uri ⇒ URI::HTTPS
readonly
A URI instance.
Instance Method Summary collapse
-
#emit_heartbeats? ⇒ boolean
True if heatbeats are emitted.
-
#initialize(client: nil, uri:, query: {}, headers: {}) ⇒ Request
constructor
Creates an OandaAPI::Streaming::Request instance.
-
#running? ⇒ true
If the instance is connected and streaming a response.
-
#stop! ⇒ void
Signals the streaming request to disconnect and terminates streaming.
-
#stop_requested? ⇒ boolean
Returns
trueif the request has been signalled to terminate. -
#stream {|OandaAPI::ResourceBase, OandaAPI::Streaming::Client| ... } ⇒ void
Emits a stream of ResourceBase instances, depending on the endpoint that the request is servicing, either Resource::Price or Resource::Transaction instances are emitted.
Constructor Details
#initialize(client: nil, uri:, query: {}, headers: {}) ⇒ Request
Creates an OandaAPI::Streaming::Request instance.
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/oanda_api/streaming/request.rb', line 30 def initialize(client: nil, uri:, query: {}, headers: {}) self.client = client.nil? ? self : client @uri = URI uri @uri.query = OandaAPI::Client.[:query_string_normalizer].call(query) @http = Net::HTTP.new @uri.host, 443 @http.use_ssl = true @http.verify_mode = OpenSSL::SSL::VERIFY_PEER @request = Net::HTTP::Get.new @uri headers.each_pair { |pair| @request.add_field(*pair) } end |
Instance Attribute Details
#client ⇒ OandaAPI::Streaming::Client
Returns a streaming client instance.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/oanda_api/streaming/request.rb', line 19 class Request attr_accessor :client, :emit_heartbeats attr_reader :uri, :request # Creates an OandaAPI::Streaming::Request instance. # @param [Streaming::Client] client a streaming client instance that can be used to # send signals to an instance of this Streaming::Request. # @param [String] uri an absolute URI to the service endpoint. # @param [Hash] query a list of query parameters, unencoded. The list # is converted into a query string. See {OandaAPI::Client#query_string_normalizer}. # @param [Hash] headers a list of header values that will be sent with the request. def initialize(client: nil, uri:, query: {}, headers: {}) self.client = client.nil? ? self : client @uri = URI uri @uri.query = OandaAPI::Client.[:query_string_normalizer].call(query) @http = Net::HTTP.new @uri.host, 443 @http.use_ssl = true @http.verify_mode = OpenSSL::SSL::VERIFY_PEER @request = Net::HTTP::Get.new @uri headers.each_pair { |pair| @request.add_field(*pair) } end # Sets the client attribute # @param [OandaAPI::Streaming::Client] value # @return [void] # @raise [ArgumentError] if value is not an OandaAPI::Streaming::Client instance. def client=(value) fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless (value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request)) @client = value end # @return [boolean] true if heatbeats are emitted. def emit_heartbeats? !!@emit_heartbeats end # Signals the streaming request to disconnect and terminates streaming. # @return [void] def stop! @stop_requested = true end # Returns `true` if the request has been signalled to terminate. See {#stop!}. # @return [boolean] def stop_requested? !!@stop_requested end # @return [true] if the instance is connected and streaming a response. def running? !!@running end # Emits a stream of {OandaAPI::ResourceBase} instances, depending # on the endpoint that the request is servicing, either # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} # instances are emitted. When #emit_heartbeats? is `true`, then # resources could also be {OandaAPI::Resource::Heartbeat}. # # Note this method runs as an infinite loop and will block indefinitely # until either the connection is halted or a {#stop!} signal is recieved. # # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response # stream is yielded as they are received. The client instance controlling the # streaming request is also yielded. It can be used to issue a signaller.#stop! to terminate the resquest. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. # @return [void] def stream(&block) @stop_requested = false @running = true # @http.set_debug_output $stderr @http.request(@request) do |response| response.read_body do |chunk| handle_response(chunk).each do |resource| block.call(resource, @client) return if stop_requested? end return if stop_requested? sleep 0.01 end end ensure @running = false @http.finish if @http.started? end private # @private # Converts a raw json response into {OandaAPI::ResourceBase} instances. # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint # that the request is servicing, which is either an array of # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances. # When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. def handle_response(response) response.split("\r\n").map do |json| parsed_response = JSON.parse json case when parsed_response["heartbeat"] OandaAPI::Resource::Heartbeat.new parsed_response["heartbeat"] if emit_heartbeats? when parsed_response["tick"] OandaAPI::Resource::Price.new parsed_response["tick"] when parsed_response["transaction"] OandaAPI::Resource::Transaction.new parsed_response["transaction"] when parsed_response["disconnect"] raise OandaAPI::StreamingDisconnect, parsed_response["disconnect"]["message"] else raise OandaAPI::RequestError, "unknown resource: #{json}" end end.compact end end |
#emit_heartbeats ⇒ boolean
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/oanda_api/streaming/request.rb', line 19 class Request attr_accessor :client, :emit_heartbeats attr_reader :uri, :request # Creates an OandaAPI::Streaming::Request instance. # @param [Streaming::Client] client a streaming client instance that can be used to # send signals to an instance of this Streaming::Request. # @param [String] uri an absolute URI to the service endpoint. # @param [Hash] query a list of query parameters, unencoded. The list # is converted into a query string. See {OandaAPI::Client#query_string_normalizer}. # @param [Hash] headers a list of header values that will be sent with the request. def initialize(client: nil, uri:, query: {}, headers: {}) self.client = client.nil? ? self : client @uri = URI uri @uri.query = OandaAPI::Client.[:query_string_normalizer].call(query) @http = Net::HTTP.new @uri.host, 443 @http.use_ssl = true @http.verify_mode = OpenSSL::SSL::VERIFY_PEER @request = Net::HTTP::Get.new @uri headers.each_pair { |pair| @request.add_field(*pair) } end # Sets the client attribute # @param [OandaAPI::Streaming::Client] value # @return [void] # @raise [ArgumentError] if value is not an OandaAPI::Streaming::Client instance. def client=(value) fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless (value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request)) @client = value end # @return [boolean] true if heatbeats are emitted. def emit_heartbeats? !!@emit_heartbeats end # Signals the streaming request to disconnect and terminates streaming. # @return [void] def stop! @stop_requested = true end # Returns `true` if the request has been signalled to terminate. See {#stop!}. # @return [boolean] def stop_requested? !!@stop_requested end # @return [true] if the instance is connected and streaming a response. def running? !!@running end # Emits a stream of {OandaAPI::ResourceBase} instances, depending # on the endpoint that the request is servicing, either # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} # instances are emitted. When #emit_heartbeats? is `true`, then # resources could also be {OandaAPI::Resource::Heartbeat}. # # Note this method runs as an infinite loop and will block indefinitely # until either the connection is halted or a {#stop!} signal is recieved. # # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response # stream is yielded as they are received. The client instance controlling the # streaming request is also yielded. It can be used to issue a signaller.#stop! to terminate the resquest. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. # @return [void] def stream(&block) @stop_requested = false @running = true # @http.set_debug_output $stderr @http.request(@request) do |response| response.read_body do |chunk| handle_response(chunk).each do |resource| block.call(resource, @client) return if stop_requested? end return if stop_requested? sleep 0.01 end end ensure @running = false @http.finish if @http.started? end private # @private # Converts a raw json response into {OandaAPI::ResourceBase} instances. # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint # that the request is servicing, which is either an array of # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances. # When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. def handle_response(response) response.split("\r\n").map do |json| parsed_response = JSON.parse json case when parsed_response["heartbeat"] OandaAPI::Resource::Heartbeat.new parsed_response["heartbeat"] if emit_heartbeats? when parsed_response["tick"] OandaAPI::Resource::Price.new parsed_response["tick"] when parsed_response["transaction"] OandaAPI::Resource::Transaction.new parsed_response["transaction"] when parsed_response["disconnect"] raise OandaAPI::StreamingDisconnect, parsed_response["disconnect"]["message"] else raise OandaAPI::RequestError, "unknown resource: #{json}" end end.compact end end |
#request ⇒ URI::HTTPS (readonly)
Returns a URI instance.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/oanda_api/streaming/request.rb', line 19 class Request attr_accessor :client, :emit_heartbeats attr_reader :uri, :request # Creates an OandaAPI::Streaming::Request instance. # @param [Streaming::Client] client a streaming client instance that can be used to # send signals to an instance of this Streaming::Request. # @param [String] uri an absolute URI to the service endpoint. # @param [Hash] query a list of query parameters, unencoded. The list # is converted into a query string. See {OandaAPI::Client#query_string_normalizer}. # @param [Hash] headers a list of header values that will be sent with the request. def initialize(client: nil, uri:, query: {}, headers: {}) self.client = client.nil? ? self : client @uri = URI uri @uri.query = OandaAPI::Client.[:query_string_normalizer].call(query) @http = Net::HTTP.new @uri.host, 443 @http.use_ssl = true @http.verify_mode = OpenSSL::SSL::VERIFY_PEER @request = Net::HTTP::Get.new @uri headers.each_pair { |pair| @request.add_field(*pair) } end # Sets the client attribute # @param [OandaAPI::Streaming::Client] value # @return [void] # @raise [ArgumentError] if value is not an OandaAPI::Streaming::Client instance. def client=(value) fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless (value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request)) @client = value end # @return [boolean] true if heatbeats are emitted. def emit_heartbeats? !!@emit_heartbeats end # Signals the streaming request to disconnect and terminates streaming. # @return [void] def stop! @stop_requested = true end # Returns `true` if the request has been signalled to terminate. See {#stop!}. # @return [boolean] def stop_requested? !!@stop_requested end # @return [true] if the instance is connected and streaming a response. def running? !!@running end # Emits a stream of {OandaAPI::ResourceBase} instances, depending # on the endpoint that the request is servicing, either # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} # instances are emitted. When #emit_heartbeats? is `true`, then # resources could also be {OandaAPI::Resource::Heartbeat}. # # Note this method runs as an infinite loop and will block indefinitely # until either the connection is halted or a {#stop!} signal is recieved. # # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response # stream is yielded as they are received. The client instance controlling the # streaming request is also yielded. It can be used to issue a signaller.#stop! to terminate the resquest. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. # @return [void] def stream(&block) @stop_requested = false @running = true # @http.set_debug_output $stderr @http.request(@request) do |response| response.read_body do |chunk| handle_response(chunk).each do |resource| block.call(resource, @client) return if stop_requested? end return if stop_requested? sleep 0.01 end end ensure @running = false @http.finish if @http.started? end private # @private # Converts a raw json response into {OandaAPI::ResourceBase} instances. # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint # that the request is servicing, which is either an array of # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances. # When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. def handle_response(response) response.split("\r\n").map do |json| parsed_response = JSON.parse json case when parsed_response["heartbeat"] OandaAPI::Resource::Heartbeat.new parsed_response["heartbeat"] if emit_heartbeats? when parsed_response["tick"] OandaAPI::Resource::Price.new parsed_response["tick"] when parsed_response["transaction"] OandaAPI::Resource::Transaction.new parsed_response["transaction"] when parsed_response["disconnect"] raise OandaAPI::StreamingDisconnect, parsed_response["disconnect"]["message"] else raise OandaAPI::RequestError, "unknown resource: #{json}" end end.compact end end |
#uri ⇒ URI::HTTPS (readonly)
Returns a URI instance.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/oanda_api/streaming/request.rb', line 19 class Request attr_accessor :client, :emit_heartbeats attr_reader :uri, :request # Creates an OandaAPI::Streaming::Request instance. # @param [Streaming::Client] client a streaming client instance that can be used to # send signals to an instance of this Streaming::Request. # @param [String] uri an absolute URI to the service endpoint. # @param [Hash] query a list of query parameters, unencoded. The list # is converted into a query string. See {OandaAPI::Client#query_string_normalizer}. # @param [Hash] headers a list of header values that will be sent with the request. def initialize(client: nil, uri:, query: {}, headers: {}) self.client = client.nil? ? self : client @uri = URI uri @uri.query = OandaAPI::Client.[:query_string_normalizer].call(query) @http = Net::HTTP.new @uri.host, 443 @http.use_ssl = true @http.verify_mode = OpenSSL::SSL::VERIFY_PEER @request = Net::HTTP::Get.new @uri headers.each_pair { |pair| @request.add_field(*pair) } end # Sets the client attribute # @param [OandaAPI::Streaming::Client] value # @return [void] # @raise [ArgumentError] if value is not an OandaAPI::Streaming::Client instance. def client=(value) fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless (value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request)) @client = value end # @return [boolean] true if heatbeats are emitted. def emit_heartbeats? !!@emit_heartbeats end # Signals the streaming request to disconnect and terminates streaming. # @return [void] def stop! @stop_requested = true end # Returns `true` if the request has been signalled to terminate. See {#stop!}. # @return [boolean] def stop_requested? !!@stop_requested end # @return [true] if the instance is connected and streaming a response. def running? !!@running end # Emits a stream of {OandaAPI::ResourceBase} instances, depending # on the endpoint that the request is servicing, either # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} # instances are emitted. When #emit_heartbeats? is `true`, then # resources could also be {OandaAPI::Resource::Heartbeat}. # # Note this method runs as an infinite loop and will block indefinitely # until either the connection is halted or a {#stop!} signal is recieved. # # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response # stream is yielded as they are received. The client instance controlling the # streaming request is also yielded. It can be used to issue a signaller.#stop! to terminate the resquest. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. # @return [void] def stream(&block) @stop_requested = false @running = true # @http.set_debug_output $stderr @http.request(@request) do |response| response.read_body do |chunk| handle_response(chunk).each do |resource| block.call(resource, @client) return if stop_requested? end return if stop_requested? sleep 0.01 end end ensure @running = false @http.finish if @http.started? end private # @private # Converts a raw json response into {OandaAPI::ResourceBase} instances. # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint # that the request is servicing, which is either an array of # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances. # When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. def handle_response(response) response.split("\r\n").map do |json| parsed_response = JSON.parse json case when parsed_response["heartbeat"] OandaAPI::Resource::Heartbeat.new parsed_response["heartbeat"] if emit_heartbeats? when parsed_response["tick"] OandaAPI::Resource::Price.new parsed_response["tick"] when parsed_response["transaction"] OandaAPI::Resource::Transaction.new parsed_response["transaction"] when parsed_response["disconnect"] raise OandaAPI::StreamingDisconnect, parsed_response["disconnect"]["message"] else raise OandaAPI::RequestError, "unknown resource: #{json}" end end.compact end end |
Instance Method Details
#emit_heartbeats? ⇒ boolean
Returns true if heatbeats are emitted.
51 52 53 |
# File 'lib/oanda_api/streaming/request.rb', line 51 def emit_heartbeats? !!@emit_heartbeats end |
#running? ⇒ true
Returns if the instance is connected and streaming a response.
68 69 70 |
# File 'lib/oanda_api/streaming/request.rb', line 68 def running? !!@running end |
#stop! ⇒ void
This method returns an undefined value.
Signals the streaming request to disconnect and terminates streaming.
57 58 59 |
# File 'lib/oanda_api/streaming/request.rb', line 57 def stop! @stop_requested = true end |
#stop_requested? ⇒ boolean
Returns true if the request has been signalled to terminate. See #stop!.
63 64 65 |
# File 'lib/oanda_api/streaming/request.rb', line 63 def stop_requested? !!@stop_requested end |
#stream {|OandaAPI::ResourceBase, OandaAPI::Streaming::Client| ... } ⇒ void
This method returns an undefined value.
Emits a stream of ResourceBase instances, depending
on the endpoint that the request is servicing, either
Resource::Price or Resource::Transaction
instances are emitted. When #emit_heartbeats? is true, then
resources could also be Resource::Heartbeat.
Note this method runs as an infinite loop and will block indefinitely until either the connection is halted or a #stop! signal is recieved.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/oanda_api/streaming/request.rb', line 87 def stream(&block) @stop_requested = false @running = true # @http.set_debug_output $stderr @http.request(@request) do |response| response.read_body do |chunk| handle_response(chunk).each do |resource| block.call(resource, @client) return if stop_requested? end return if stop_requested? sleep 0.01 end end ensure @running = false @http.finish if @http.started? end |