Class: Ksql::Stream

Inherits:
Object
  • Object
show all
Defined in:
lib/ksql/stream.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(client, request) ⇒ Stream

Returns a new instance of Stream.



9
10
11
12
# File 'lib/ksql/stream.rb', line 9

def initialize(client, request)
  @client = client
  @request = request
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



7
8
9
# File 'lib/ksql/stream.rb', line 7

def id
  @id
end

Instance Method Details

#closeObject

Close the streaming connection

Raises:



17
18
19
20
21
# File 'lib/ksql/stream.rb', line 17

def close
  raise StreamError.new('The stream hasn\'t started!') unless @id.present?

  @client.close
end

#on_close(&block) ⇒ Object

Specify the action to take when the Streaming connection gets closed.

Parameters:

  • &block (Block)

    Code to execute on connection closure



28
29
30
# File 'lib/ksql/stream.rb', line 28

def on_close(&block)
  @client.on(:close) { yield }
end

#on_error(&block) ⇒ Object

Specify the action to take when the Streaming connection raises an error.

Parameters:

  • &block (Block)

    Code to execute when connection errors occur



37
38
39
# File 'lib/ksql/stream.rb', line 37

def on_error(&block)
  @client.on(:error) { |e| yield(e) }
end

#start(&block) ⇒ Object

Streaming connection handler

  • Start the stream

  • Wrap the stream events into OpenStruct instances

  • Execute the passed block

Parameters:

  • &block (Block)

    Code to execute each time an event arrives



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/ksql/stream.rb', line 50

def start(&block)
  @headers = {}

  @request.on(:headers) { |headers| @headers = headers }

  @request.on(:body_chunk) do |body|
    next unless body.present?

    response = Ksql::Connection::Response.new(body: body, headers: @headers)
    raise Ksql::StreamError.new(response.body['message']) if response.error?

    if response.body.is_a? Hash
      @event_class = build_event_class(response.body)
      next
    else
      event = @event_class.new(*response.body)
    end

    yield(event)
  end

  @client.call_async(@request)
end