Class: Ksql::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/ksql/client.rb

Class Method Summary collapse

Class Method Details

.close_query(id, headers: {}) ⇒ Array/Hash/String/Integer

Request /close-query endpoint

Parameters:

  • id (String)

    Query ID

  • headers (Hash) (defaults to: {})

    Request headers

Returns:

  • (Array/Hash/String/Integer)

    Request result



14
15
16
17
18
# File 'lib/ksql/client.rb', line 14

def close_query(id, headers: {})
  request = Api::CloseQuery.build(id, headers: headers)
  result = Connection::Client.call_sync(request)
  Handlers::Raw.handle(result)
end

.cluster_status(headers: {}) ⇒ JSON

Request /clusterStatus endpoint synchronously

Parameters:

  • headers (Hash) (defaults to: {})

    Request headers

Returns:

  • (JSON)

    Request response



27
28
29
30
# File 'lib/ksql/client.rb', line 27

def cluster_status(headers: {})
  request = Api::ClusterStatus.build(headers: headers)
  Connection::Client.call_sync(request)
end

.health_check(headers: {}) ⇒ JSON

Request /healthcheck endpoint synchronously

Parameters:

  • headers (Hash) (defaults to: {})

    Request headers

Returns:

  • (JSON)

    Request response



39
40
41
42
# File 'lib/ksql/client.rb', line 39

def health_check(headers: {})
  request = Api::HealthCheck.build(headers: headers)
  Connection::Client.call_sync(request)
end

.info(headers: {}) ⇒ JSON

Request /info endpoint synchronously

Parameters:

  • headers (Hash) (defaults to: {})

    Request headers

Returns:

  • (JSON)

    Request response



51
52
53
54
# File 'lib/ksql/client.rb', line 51

def info(headers: {})
  request = Api::Info.build(headers: headers)
  Connection::Client.call_sync(request)
end

.ksql(ksql, command_sequence_number: nil, headers: {}, session_variables: {}, streams_properties: {}) ⇒ Ksql::@type

Request /ksql endpoint

Parameters:

  • ksql (String)

    SQL Statement

  • command_sequence_number (Integer) (defaults to: nil)

    The statements will not be run until all existing commands have completed.

  • headers (Hash) (defaults to: {})

    Request headers

  • session_variables (Hash) (defaults to: {})

    Variable substitution values

  • streams_properties (Hash) (defaults to: {})

    Property overrides to run the statements with

Returns:

  • (Ksql::@type)

    Request result



67
68
69
70
71
# File 'lib/ksql/client.rb', line 67

def ksql(ksql, command_sequence_number: nil, headers: {}, session_variables: {}, streams_properties: {})
  request = Api::Ksql.build(ksql, command_sequence_number: command_sequence_number, headers: headers, session_variables: session_variables, streams_properties: streams_properties)
  result = Connection::Client.call_sync(request)
  Handlers::TypedList.handle(result)
end

.query(sql, headers: {}, properties: {}, session_variables: {}) ⇒ Ksql::Connection::Request

Request /query-stream endpoint synchronously

Parameters:

  • sql (String)

    SQL Statement

  • headers (Hash) (defaults to: {})

    Request headers

  • properties (Hash) (defaults to: {})

    Optional properties for the query

  • session_variables (Hash) (defaults to: {})

    Variable substitution values

Returns:



83
84
85
86
87
# File 'lib/ksql/client.rb', line 83

def query(sql, headers: {}, properties: {}, session_variables: {})
  request = Api::Query.build(sql, headers: headers, properties: properties, session_variables: session_variables)
  result = Connection::Client.call_sync(request)
  Handlers::Collection.handle(result)
end

.stream(sql, headers: {}, properties: {}, session_variables: {}) ⇒ Ksql::Stream

Request /query-stream endpoint asynchronously

Parameters:

  • sql (String)

    SQL Statement

  • headers (Hash) (defaults to: {})

    Request headers

  • properties (Hash) (defaults to: {})

    Optional properties for the query

  • session_variables (Hash) (defaults to: {})

    Variable substitution values

Returns:



99
100
101
102
103
# File 'lib/ksql/client.rb', line 99

def stream(sql, headers: {}, properties: {}, session_variables: {})
  request = Api::Stream.build(sql, headers: headers, properties: properties, session_variables: session_variables)
  client, prepared_request = Connection::Client.call_async(request)
  Handlers::Stream.handle(client, prepared_request)
end

.terminate(delete_topic_list: nil, headers: {}) ⇒ JSON

Request /terminate endpoint synchronously

Parameters:

  • delete_topic_list (Array<String>) (defaults to: nil)

    Kafka topic names or regular expressions

Returns:

  • (JSON)

    Request response



112
113
114
115
# File 'lib/ksql/client.rb', line 112

def terminate(delete_topic_list: nil, headers: {})
  request = Api::Terminate.build(delete_topic_list, headers: headers)
  Connection::Client.call_sync(request)
end