Class: Ksql::Client
- Inherits:
-
Object
- Object
- Ksql::Client
- Defined in:
- lib/ksql/client.rb
Class Method Summary collapse
-
.close_query(id, headers: {}) ⇒ Array/Hash/String/Integer
Request /close-query endpoint.
-
.cluster_status(headers: {}) ⇒ JSON
Request /clusterStatus endpoint synchronously.
-
.health_check(headers: {}) ⇒ JSON
Request /healthcheck endpoint synchronously.
-
.info(headers: {}) ⇒ JSON
Request /info endpoint synchronously.
-
.ksql(ksql, command_sequence_number: nil, headers: {}, session_variables: {}, streams_properties: {}) ⇒ Ksql::@type
Request /ksql endpoint.
-
.query(sql, headers: {}, properties: {}, session_variables: {}) ⇒ Ksql::Connection::Request
Request /query-stream endpoint synchronously.
-
.stream(sql, headers: {}, properties: {}, session_variables: {}) ⇒ Ksql::Stream
Request /query-stream endpoint asynchronously.
-
.terminate(delete_topic_list: nil, headers: {}) ⇒ JSON
Request /terminate endpoint synchronously.
Class Method Details
.close_query(id, headers: {}) ⇒ Array/Hash/String/Integer
Request /close-query endpoint
14 15 16 17 18 |
# File 'lib/ksql/client.rb', line 14 def close_query(id, headers: {}) request = Api::CloseQuery.build(id, headers: headers) result = Connection::Client.call_sync(request) Handlers::Raw.handle(result) end |
.cluster_status(headers: {}) ⇒ JSON
Request /clusterStatus endpoint synchronously
27 28 29 30 |
# File 'lib/ksql/client.rb', line 27 def cluster_status(headers: {}) request = Api::ClusterStatus.build(headers: headers) Connection::Client.call_sync(request) end |
.health_check(headers: {}) ⇒ JSON
Request /healthcheck endpoint synchronously
39 40 41 42 |
# File 'lib/ksql/client.rb', line 39 def health_check(headers: {}) request = Api::HealthCheck.build(headers: headers) Connection::Client.call_sync(request) end |
.info(headers: {}) ⇒ JSON
Request /info endpoint synchronously
51 52 53 54 |
# File 'lib/ksql/client.rb', line 51 def info(headers: {}) request = Api::Info.build(headers: headers) Connection::Client.call_sync(request) end |
.ksql(ksql, command_sequence_number: nil, headers: {}, session_variables: {}, streams_properties: {}) ⇒ Ksql::@type
Request /ksql endpoint
67 68 69 70 71 |
# File 'lib/ksql/client.rb', line 67 def ksql(ksql, command_sequence_number: nil, headers: {}, session_variables: {}, streams_properties: {}) request = Api::Ksql.build(ksql, command_sequence_number: command_sequence_number, headers: headers, session_variables: session_variables, streams_properties: streams_properties) result = Connection::Client.call_sync(request) Handlers::TypedList.handle(result) end |
.query(sql, headers: {}, properties: {}, session_variables: {}) ⇒ Ksql::Connection::Request
Request /query-stream endpoint synchronously
83 84 85 86 87 |
# File 'lib/ksql/client.rb', line 83 def query(sql, headers: {}, properties: {}, session_variables: {}) request = Api::Query.build(sql, headers: headers, properties: properties, session_variables: session_variables) result = Connection::Client.call_sync(request) Handlers::Collection.handle(result) end |
.stream(sql, headers: {}, properties: {}, session_variables: {}) ⇒ Ksql::Stream
Request /query-stream endpoint asynchronously
99 100 101 102 103 |
# File 'lib/ksql/client.rb', line 99 def stream(sql, headers: {}, properties: {}, session_variables: {}) request = Api::Stream.build(sql, headers: headers, properties: properties, session_variables: session_variables) client, prepared_request = Connection::Client.call_async(request) Handlers::Stream.handle(client, prepared_request) end |
.terminate(delete_topic_list: nil, headers: {}) ⇒ JSON
Request /terminate endpoint synchronously
112 113 114 115 |
# File 'lib/ksql/client.rb', line 112 def terminate(delete_topic_list: nil, headers: {}) request = Api::Terminate.build(delete_topic_list, headers: headers) Connection::Client.call_sync(request) end |