Class: DatahubTopic
- Inherits:
-
Object
- Object
- DatahubTopic
- Defined in:
- lib/fluent/plugin/datahub/datahub-topic.rb
Instance Attribute Summary collapse
-
#comment ⇒ Object
Returns the value of attribute comment.
-
#create_time ⇒ Object
Returns the value of attribute create_time.
-
#last_modify_time ⇒ Object
Returns the value of attribute last_modify_time.
-
#lifecycle ⇒ Object
Returns the value of attribute lifecycle.
-
#record_schema ⇒ Object
Returns the value of attribute record_schema.
-
#record_type ⇒ Object
Returns the value of attribute record_type.
-
#shard_count ⇒ Object
Returns the value of attribute shard_count.
Instance Method Summary collapse
- #get_cursor(shard_id, offset = DateTime.now.strftime('%Q'), type = "OLDEST") ⇒ Object
-
#initialize(datahub_http_client, project_name, topic_name) ⇒ DatahubTopic
constructor
A new instance of DatahubTopic.
- #list_shards ⇒ Object
- #read_data(shard_id, cursor, count) ⇒ Object
- #write_data(record_entities) ⇒ Object
Constructor Details
#initialize(datahub_http_client, project_name, topic_name) ⇒ DatahubTopic
Returns a new instance of DatahubTopic.
15 16 17 18 19 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 15 def initialize(datahub_http_client, project_name, topic_name) @client = datahub_http_client @project_name = project_name @topic_name = topic_name end |
Instance Attribute Details
#comment ⇒ Object
Returns the value of attribute comment.
11 12 13 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 11 def comment @comment end |
#create_time ⇒ Object
Returns the value of attribute create_time.
12 13 14 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 12 def create_time @create_time end |
#last_modify_time ⇒ Object
Returns the value of attribute last_modify_time.
13 14 15 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 13 def last_modify_time @last_modify_time end |
#lifecycle ⇒ Object
Returns the value of attribute lifecycle.
8 9 10 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 8 def lifecycle @lifecycle end |
#record_schema ⇒ Object
Returns the value of attribute record_schema.
10 11 12 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 10 def record_schema @record_schema end |
#record_type ⇒ Object
Returns the value of attribute record_type.
9 10 11 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 9 def record_type @record_type end |
#shard_count ⇒ Object
Returns the value of attribute shard_count.
7 8 9 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 7 def shard_count @shard_count end |
Instance Method Details
#get_cursor(shard_id, offset = DateTime.now.strftime('%Q'), type = "OLDEST") ⇒ Object
45 46 47 48 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 45 def get_cursor(shard_id, offset=DateTime.now.strftime('%Q'), type="OLDEST") result_map = @client.get_shard_cursor(@project_name, @topic_name, shard_id, offset, type) return result_map["Cursor"] end |
#list_shards ⇒ Object
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 21 def list_shards() result_map = @client.list_shards(@project_name, @topic_name) shard_array = result_map["Shards"] shards = [] for i in 0...shard_array.size shard = DatahubShard.new shard_map = shard_array[i] shard.begin_key = shard_map["BeginKey"] shard.end_key = shard_map["EndKey"] shard.left_shard_id = shard_map["LeftShardId"] shard.parent_shard_ids = shard_map["ParentShardIds"] shard.right_shard_id = shard_map["RightShardId"] shard.shard_id = shard_map["ShardId"] shard.state = shard_map["State"] shards.push(shard) end return shards end |
#read_data(shard_id, cursor, count) ⇒ Object
69 70 71 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 69 def read_data(shard_id, cursor, count) @client.read_data_from_shard_with_cursor(@project_name, @topic_name, shard_id, cursor, count) end |
#write_data(record_entities) ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 50 def write_data(record_entities) put_record_result = PutRecordResult.new result_map = @client.write_data_to_topic(@project_name, @topic_name, record_entities) if result_map["FailedRecordCount"] > 0 put_record_result.failed_record_count = result_map["FailedRecordCount"] for i in 0...result_map["FailedRecords"].size result_error = result_map["FailedRecords"][i] put_record_result.failed_record_index.push(result_error["Index"]) error_entity = {} error_entity["error_code"] = result_error["ErrorCode"] error_entity["error_message"] = result_error["ErrorMessage"] put_record_result.failed_record_error.push(error_entity) put_record_result.failed_record_list.push(record_entities[result_error["Index"]]) end end return put_record_result end |