Class: Ksql::Stream
- Inherits:
-
Object
- Object
- Ksql::Stream
- Defined in:
- lib/ksql/stream.rb
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
Instance Method Summary collapse
-
#close ⇒ Object
Close the streaming connection.
-
#initialize(client, request) ⇒ Stream
constructor
A new instance of Stream.
-
#on_close(&block) ⇒ Object
Specify the action to take when the Streaming connection gets closed.
-
#on_error(&block) ⇒ Object
Specify the action to take when the Streaming connection raises an error.
-
#start(&block) ⇒ Object
Streaming connection handler.
Constructor Details
#initialize(client, request) ⇒ Stream
Returns a new instance of Stream.
9 10 11 12 |
# File 'lib/ksql/stream.rb', line 9 def initialize(client, request) @client = client @request = request end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
7 8 9 |
# File 'lib/ksql/stream.rb', line 7 def id @id end |
Instance Method Details
#close ⇒ Object
Close the streaming connection
17 18 19 20 21 |
# File 'lib/ksql/stream.rb', line 17 def close raise StreamError.new('The stream hasn\'t started!') unless @id.present? @client.close end |
#on_close(&block) ⇒ Object
Specify the action to take when the Streaming connection gets closed.
28 29 30 |
# File 'lib/ksql/stream.rb', line 28 def on_close(&block) @client.on(:close) { yield } end |
#on_error(&block) ⇒ Object
Specify the action to take when the Streaming connection raises an error.
37 38 39 |
# File 'lib/ksql/stream.rb', line 37 def on_error(&block) @client.on(:error) { |e| yield(e) } end |
#start(&block) ⇒ Object
Streaming connection handler
-
Start the stream
-
Wrap the stream events into OpenStruct instances
-
Execute the passed block
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/ksql/stream.rb', line 50 def start(&block) @headers = {} @request.on(:headers) { |headers| @headers = headers } @request.on(:body_chunk) do |body| next unless body.present? response = Ksql::Connection::Response.new(body: body, headers: @headers) raise Ksql::StreamError.new(response.body['message']) if response.error? if response.body.is_a? Hash @event_class = build_event_class(response.body) next else event = @event_class.new(*response.body) end yield(event) end @client.call_async(@request) end |