Class: OandaAPI::Streaming::Request
- Inherits:
-
Object
- Object
- OandaAPI::Streaming::Request
- Defined in:
- lib/oanda_api/streaming/request.rb
Overview
An HTTP 1.1 streaming request. Used to create a persistent connection with the server and continuously download a stream of resource representations. Resources are emitted as ResourceBase instances.
Instance Attribute Summary collapse
-
#client ⇒ OandaAPI::Streaming::Client
A streaming client instance.
-
#emit_heartbeats ⇒ boolean
True if heartbeats are emitted.
-
#request ⇒ URI::HTTPS
readonly
A URI instance.
-
#uri ⇒ URI::HTTPS
readonly
A URI instance.
Instance Method Summary collapse
-
#emit_heartbeats? ⇒ boolean
True if heatbeats are emitted.
-
#initialize(client: nil, uri: nil, query: {}, headers: {}) ⇒ Request
constructor
Creates a
Streaming::Request
instance. -
#running? ⇒ boolean
True if the instance is connected and is streaming a response.
-
#stop! ⇒ void
Signals the streaming request to disconnect and terminates streaming.
-
#stop_requested? ⇒ boolean
Returns
true
if the request has been signalled to terminate. -
#stream {|OandaAPI::ResourceBase, OandaAPI::Streaming::Client| ... } ⇒ void
Emits a stream of ResourceBase instances, depending on the endpoint that the request is servicing, either Resource::Price or Resource::Transaction instances are emitted.
Constructor Details
#initialize(client: nil, uri: nil, query: {}, headers: {}) ⇒ Request
Creates a Streaming::Request
instance.
30 31 32 33 34 35 36 37 38 39 |
# File 'lib/oanda_api/streaming/request.rb', line 30 def initialize(client: nil, uri: nil, query: {}, headers: {}) self.client = client.nil? ? self : client @uri = URI uri @uri.query = OandaAPI::Client.[:query_string_normalizer].call(query) @http = Net::HTTP.new @uri.host, 443 @http.use_ssl = true @http.verify_mode = OpenSSL::SSL::VERIFY_PEER @request = Net::HTTP::Get.new @uri headers.each_pair { |pair| @request.add_field(*pair) } end |
Instance Attribute Details
#client ⇒ OandaAPI::Streaming::Client
Returns a streaming client instance.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/oanda_api/streaming/request.rb', line 19 class Request attr_accessor :client, :emit_heartbeats attr_reader :uri, :request # Creates a `Streaming::Request` instance. # @param [Streaming::Client] client a streaming client instance which can be used to # send signals to an instance of this `Streaming::Request` class. # @param [String] uri an absolute URI to the service endpoint. # @param [Hash] query a list of query parameters, unencoded. The list # is converted into a query string. See `OandaAPI::Client#query_string_normalizer`. # @param [Hash] headers a list of header values that will be sent with the request. def initialize(client: nil, uri: nil, query: {}, headers: {}) self.client = client.nil? ? self : client @uri = URI uri @uri.query = OandaAPI::Client.[:query_string_normalizer].call(query) @http = Net::HTTP.new @uri.host, 443 @http.use_ssl = true @http.verify_mode = OpenSSL::SSL::VERIFY_PEER @request = Net::HTTP::Get.new @uri headers.each_pair { |pair| @request.add_field(*pair) } end # Sets the client attribute # @param [OandaAPI::Streaming::Client] value # @return [void] # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance. def client=(value) fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request) @client = value end # @return [boolean] true if heatbeats are emitted. def emit_heartbeats? !!@emit_heartbeats end # Signals the streaming request to disconnect and terminates streaming. # @return [void] def stop! @stop_requested = true end # Returns `true` if the request has been signalled to terminate. See {#stop!}. # @return [boolean] def stop_requested? !!@stop_requested end # @return [boolean] true if the instance is connected and is streaming a response. def running? !!@running end # Emits a stream of {OandaAPI::ResourceBase} instances, depending # on the endpoint that the request is servicing, either # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} # instances are emitted. When {#emit_heartbeats?} is `true`, then # {OandaAPI::Resource::Heartbeat} could also be emitted. # # Note this method runs as an infinite loop and will block indefinitely # until either the connection is halted or a {#stop!} signal is recieved. # # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response # stream is yielded as they are received. The client instance controlling the # streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. # @return [void] def stream(&block) @stop_requested = false @running = true # @http.set_debug_output $stderr @http.request(@request) do |response| buffer = "" response.read_body do |chunk| buffer << chunk next unless chunk.match(/\r\n\Z/) buffer.gsub!(/\r\n/,"") handle_response(buffer).each do |resource| block.call(resource, @client) return if stop_requested? end buffer = "" return if stop_requested? sleep 0.01 end end ensure @running = false @http.finish if @http.started? end private # @private # Converts a raw json response into {OandaAPI::ResourceBase} instances. # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint # that the request is servicing, which is either an array of # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances. # When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. def handle_response(string) parse(string).map do |parsed_response| case when parsed_response[:heartbeat] OandaAPI::Resource::Heartbeat.new parsed_response[:heartbeat] if emit_heartbeats? when parsed_response[:tick] OandaAPI::Resource::Price.new parsed_response[:tick] when parsed_response[:transaction] OandaAPI::Resource::Transaction.new parsed_response[:transaction] when parsed_response[:disconnect] fail OandaAPI::StreamingDisconnect, parsed_response[:disconnect][:message] else fail OandaAPI::RequestError, "unknown resource: #{parsed_response}" end end.compact end # @private # Uses the best json parser available for optimal performance and stream parsing ability. def parse(string) OandaAPI::Streaming::JsonParser.adapter.parse string end end |
#emit_heartbeats ⇒ boolean
Returns true if heartbeats are emitted.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/oanda_api/streaming/request.rb', line 19 class Request attr_accessor :client, :emit_heartbeats attr_reader :uri, :request # Creates a `Streaming::Request` instance. # @param [Streaming::Client] client a streaming client instance which can be used to # send signals to an instance of this `Streaming::Request` class. # @param [String] uri an absolute URI to the service endpoint. # @param [Hash] query a list of query parameters, unencoded. The list # is converted into a query string. See `OandaAPI::Client#query_string_normalizer`. # @param [Hash] headers a list of header values that will be sent with the request. def initialize(client: nil, uri: nil, query: {}, headers: {}) self.client = client.nil? ? self : client @uri = URI uri @uri.query = OandaAPI::Client.[:query_string_normalizer].call(query) @http = Net::HTTP.new @uri.host, 443 @http.use_ssl = true @http.verify_mode = OpenSSL::SSL::VERIFY_PEER @request = Net::HTTP::Get.new @uri headers.each_pair { |pair| @request.add_field(*pair) } end # Sets the client attribute # @param [OandaAPI::Streaming::Client] value # @return [void] # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance. def client=(value) fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request) @client = value end # @return [boolean] true if heatbeats are emitted. def emit_heartbeats? !!@emit_heartbeats end # Signals the streaming request to disconnect and terminates streaming. # @return [void] def stop! @stop_requested = true end # Returns `true` if the request has been signalled to terminate. See {#stop!}. # @return [boolean] def stop_requested? !!@stop_requested end # @return [boolean] true if the instance is connected and is streaming a response. def running? !!@running end # Emits a stream of {OandaAPI::ResourceBase} instances, depending # on the endpoint that the request is servicing, either # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} # instances are emitted. When {#emit_heartbeats?} is `true`, then # {OandaAPI::Resource::Heartbeat} could also be emitted. # # Note this method runs as an infinite loop and will block indefinitely # until either the connection is halted or a {#stop!} signal is recieved. # # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response # stream is yielded as they are received. The client instance controlling the # streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. # @return [void] def stream(&block) @stop_requested = false @running = true # @http.set_debug_output $stderr @http.request(@request) do |response| buffer = "" response.read_body do |chunk| buffer << chunk next unless chunk.match(/\r\n\Z/) buffer.gsub!(/\r\n/,"") handle_response(buffer).each do |resource| block.call(resource, @client) return if stop_requested? end buffer = "" return if stop_requested? sleep 0.01 end end ensure @running = false @http.finish if @http.started? end private # @private # Converts a raw json response into {OandaAPI::ResourceBase} instances. # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint # that the request is servicing, which is either an array of # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances. # When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. def handle_response(string) parse(string).map do |parsed_response| case when parsed_response[:heartbeat] OandaAPI::Resource::Heartbeat.new parsed_response[:heartbeat] if emit_heartbeats? when parsed_response[:tick] OandaAPI::Resource::Price.new parsed_response[:tick] when parsed_response[:transaction] OandaAPI::Resource::Transaction.new parsed_response[:transaction] when parsed_response[:disconnect] fail OandaAPI::StreamingDisconnect, parsed_response[:disconnect][:message] else fail OandaAPI::RequestError, "unknown resource: #{parsed_response}" end end.compact end # @private # Uses the best json parser available for optimal performance and stream parsing ability. def parse(string) OandaAPI::Streaming::JsonParser.adapter.parse string end end |
#request ⇒ URI::HTTPS (readonly)
Returns a URI instance.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/oanda_api/streaming/request.rb', line 19 class Request attr_accessor :client, :emit_heartbeats attr_reader :uri, :request # Creates a `Streaming::Request` instance. # @param [Streaming::Client] client a streaming client instance which can be used to # send signals to an instance of this `Streaming::Request` class. # @param [String] uri an absolute URI to the service endpoint. # @param [Hash] query a list of query parameters, unencoded. The list # is converted into a query string. See `OandaAPI::Client#query_string_normalizer`. # @param [Hash] headers a list of header values that will be sent with the request. def initialize(client: nil, uri: nil, query: {}, headers: {}) self.client = client.nil? ? self : client @uri = URI uri @uri.query = OandaAPI::Client.[:query_string_normalizer].call(query) @http = Net::HTTP.new @uri.host, 443 @http.use_ssl = true @http.verify_mode = OpenSSL::SSL::VERIFY_PEER @request = Net::HTTP::Get.new @uri headers.each_pair { |pair| @request.add_field(*pair) } end # Sets the client attribute # @param [OandaAPI::Streaming::Client] value # @return [void] # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance. def client=(value) fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request) @client = value end # @return [boolean] true if heatbeats are emitted. def emit_heartbeats? !!@emit_heartbeats end # Signals the streaming request to disconnect and terminates streaming. # @return [void] def stop! @stop_requested = true end # Returns `true` if the request has been signalled to terminate. See {#stop!}. # @return [boolean] def stop_requested? !!@stop_requested end # @return [boolean] true if the instance is connected and is streaming a response. def running? !!@running end # Emits a stream of {OandaAPI::ResourceBase} instances, depending # on the endpoint that the request is servicing, either # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} # instances are emitted. When {#emit_heartbeats?} is `true`, then # {OandaAPI::Resource::Heartbeat} could also be emitted. # # Note this method runs as an infinite loop and will block indefinitely # until either the connection is halted or a {#stop!} signal is recieved. # # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response # stream is yielded as they are received. The client instance controlling the # streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. # @return [void] def stream(&block) @stop_requested = false @running = true # @http.set_debug_output $stderr @http.request(@request) do |response| buffer = "" response.read_body do |chunk| buffer << chunk next unless chunk.match(/\r\n\Z/) buffer.gsub!(/\r\n/,"") handle_response(buffer).each do |resource| block.call(resource, @client) return if stop_requested? end buffer = "" return if stop_requested? sleep 0.01 end end ensure @running = false @http.finish if @http.started? end private # @private # Converts a raw json response into {OandaAPI::ResourceBase} instances. # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint # that the request is servicing, which is either an array of # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances. # When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. def handle_response(string) parse(string).map do |parsed_response| case when parsed_response[:heartbeat] OandaAPI::Resource::Heartbeat.new parsed_response[:heartbeat] if emit_heartbeats? when parsed_response[:tick] OandaAPI::Resource::Price.new parsed_response[:tick] when parsed_response[:transaction] OandaAPI::Resource::Transaction.new parsed_response[:transaction] when parsed_response[:disconnect] fail OandaAPI::StreamingDisconnect, parsed_response[:disconnect][:message] else fail OandaAPI::RequestError, "unknown resource: #{parsed_response}" end end.compact end # @private # Uses the best json parser available for optimal performance and stream parsing ability. def parse(string) OandaAPI::Streaming::JsonParser.adapter.parse string end end |
#uri ⇒ URI::HTTPS (readonly)
Returns a URI instance.
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# File 'lib/oanda_api/streaming/request.rb', line 19 class Request attr_accessor :client, :emit_heartbeats attr_reader :uri, :request # Creates a `Streaming::Request` instance. # @param [Streaming::Client] client a streaming client instance which can be used to # send signals to an instance of this `Streaming::Request` class. # @param [String] uri an absolute URI to the service endpoint. # @param [Hash] query a list of query parameters, unencoded. The list # is converted into a query string. See `OandaAPI::Client#query_string_normalizer`. # @param [Hash] headers a list of header values that will be sent with the request. def initialize(client: nil, uri: nil, query: {}, headers: {}) self.client = client.nil? ? self : client @uri = URI uri @uri.query = OandaAPI::Client.[:query_string_normalizer].call(query) @http = Net::HTTP.new @uri.host, 443 @http.use_ssl = true @http.verify_mode = OpenSSL::SSL::VERIFY_PEER @request = Net::HTTP::Get.new @uri headers.each_pair { |pair| @request.add_field(*pair) } end # Sets the client attribute # @param [OandaAPI::Streaming::Client] value # @return [void] # @raise [ArgumentError] if value is not an {OandaAPI::Streaming::Client} instance. def client=(value) fail ArgumentError, "Expecting an OandaAPI::Streaming::Client" unless value.is_a?(OandaAPI::Streaming::Client) || value.is_a?(OandaAPI::Streaming::Request) @client = value end # @return [boolean] true if heatbeats are emitted. def emit_heartbeats? !!@emit_heartbeats end # Signals the streaming request to disconnect and terminates streaming. # @return [void] def stop! @stop_requested = true end # Returns `true` if the request has been signalled to terminate. See {#stop!}. # @return [boolean] def stop_requested? !!@stop_requested end # @return [boolean] true if the instance is connected and is streaming a response. def running? !!@running end # Emits a stream of {OandaAPI::ResourceBase} instances, depending # on the endpoint that the request is servicing, either # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} # instances are emitted. When {#emit_heartbeats?} is `true`, then # {OandaAPI::Resource::Heartbeat} could also be emitted. # # Note this method runs as an infinite loop and will block indefinitely # until either the connection is halted or a {#stop!} signal is recieved. # # @yield [OandaAPI::ResourceBase, OandaAPI::Streaming::Client] Each resource found in the response # stream is yielded as they are received. The client instance controlling the # streaming request is also yielded. It can be used to issue a `client.stop!` to terminate the resquest. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. # @return [void] def stream(&block) @stop_requested = false @running = true # @http.set_debug_output $stderr @http.request(@request) do |response| buffer = "" response.read_body do |chunk| buffer << chunk next unless chunk.match(/\r\n\Z/) buffer.gsub!(/\r\n/,"") handle_response(buffer).each do |resource| block.call(resource, @client) return if stop_requested? end buffer = "" return if stop_requested? sleep 0.01 end end ensure @running = false @http.finish if @http.started? end private # @private # Converts a raw json response into {OandaAPI::ResourceBase} instances. # @return [Array<OandaAPI::ResourceBase>] depending on the endpoint # that the request is servicing, which is either an array of # {OandaAPI::Resource::Price} or {OandaAPI::Resource::Transaction} instances. # When #emit_heartbeats? is `true`, then the instance could be an {OandaAPI::Resource::Heartbeat}. # @raise [OandaAPI::StreamingDisconnect] if the endpoint was disconnected by server. # @raise [OandaAPI::RequestError] if an unexpected resource is returned. def handle_response(string) parse(string).map do |parsed_response| case when parsed_response[:heartbeat] OandaAPI::Resource::Heartbeat.new parsed_response[:heartbeat] if emit_heartbeats? when parsed_response[:tick] OandaAPI::Resource::Price.new parsed_response[:tick] when parsed_response[:transaction] OandaAPI::Resource::Transaction.new parsed_response[:transaction] when parsed_response[:disconnect] fail OandaAPI::StreamingDisconnect, parsed_response[:disconnect][:message] else fail OandaAPI::RequestError, "unknown resource: #{parsed_response}" end end.compact end # @private # Uses the best json parser available for optimal performance and stream parsing ability. def parse(string) OandaAPI::Streaming::JsonParser.adapter.parse string end end |
Instance Method Details
#emit_heartbeats? ⇒ boolean
Returns true if heatbeats are emitted.
51 52 53 |
# File 'lib/oanda_api/streaming/request.rb', line 51 def emit_heartbeats? !!@emit_heartbeats end |
#running? ⇒ boolean
Returns true if the instance is connected and is streaming a response.
68 69 70 |
# File 'lib/oanda_api/streaming/request.rb', line 68 def running? !!@running end |
#stop! ⇒ void
This method returns an undefined value.
Signals the streaming request to disconnect and terminates streaming.
57 58 59 |
# File 'lib/oanda_api/streaming/request.rb', line 57 def stop! @stop_requested = true end |
#stop_requested? ⇒ boolean
Returns true
if the request has been signalled to terminate. See #stop!.
63 64 65 |
# File 'lib/oanda_api/streaming/request.rb', line 63 def stop_requested? !!@stop_requested end |
#stream {|OandaAPI::ResourceBase, OandaAPI::Streaming::Client| ... } ⇒ void
This method returns an undefined value.
Emits a stream of ResourceBase instances, depending
on the endpoint that the request is servicing, either
Resource::Price or Resource::Transaction
instances are emitted. When #emit_heartbeats? is true
, then
Resource::Heartbeat could also be emitted.
Note this method runs as an infinite loop and will block indefinitely until either the connection is halted or a #stop! signal is recieved.
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/oanda_api/streaming/request.rb', line 87 def stream(&block) @stop_requested = false @running = true # @http.set_debug_output $stderr @http.request(@request) do |response| buffer = "" response.read_body do |chunk| buffer << chunk next unless chunk.match(/\r\n\Z/) buffer.gsub!(/\r\n/,"") handle_response(buffer).each do |resource| block.call(resource, @client) return if stop_requested? end buffer = "" return if stop_requested? sleep 0.01 end end ensure @running = false @http.finish if @http.started? end |