Class: DatahubTopic

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/datahub/datahub-topic.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(datahub_http_client, project_name, topic_name) ⇒ DatahubTopic

Returns a new instance of DatahubTopic.



15
16
17
18
19
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 15

def initialize(datahub_http_client, project_name, topic_name)
    @client = datahub_http_client
    @project_name = project_name
    @topic_name = topic_name
end

Instance Attribute Details

#commentObject

Returns the value of attribute comment.



11
12
13
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 11

def comment
  @comment
end

#create_timeObject

Returns the value of attribute create_time.



12
13
14
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 12

def create_time
  @create_time
end

#last_modify_timeObject

Returns the value of attribute last_modify_time.



13
14
15
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 13

def last_modify_time
  @last_modify_time
end

#lifecycleObject

Returns the value of attribute lifecycle.



8
9
10
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 8

def lifecycle
  @lifecycle
end

#record_schemaObject

Returns the value of attribute record_schema.



10
11
12
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 10

def record_schema
  @record_schema
end

#record_typeObject

Returns the value of attribute record_type.



9
10
11
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 9

def record_type
  @record_type
end

#shard_countObject

Returns the value of attribute shard_count.



7
8
9
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 7

def shard_count
  @shard_count
end

Instance Method Details

#get_cursor(shard_id, offset = DateTime.now.strftime('%Q'), type = "OLDEST") ⇒ Object



45
46
47
48
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 45

def get_cursor(shard_id, offset=DateTime.now.strftime('%Q'), type="OLDEST")
    result_map = @client.get_shard_cursor(@project_name, @topic_name, shard_id, offset, type)
    return result_map["Cursor"]
end

#list_shardsObject



21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 21

def list_shards()
    result_map = @client.list_shards(@project_name, @topic_name)
    shard_array = result_map["Shards"]
    
    shards = []
    
    for i in 0...shard_array.size
        shard = DatahubShard.new

        shard_map = shard_array[i]
        shard.begin_key = shard_map["BeginKey"]
        shard.end_key = shard_map["EndKey"]
        shard.left_shard_id = shard_map["LeftShardId"]
        shard.parent_shard_ids = shard_map["ParentShardIds"]
        shard.right_shard_id = shard_map["RightShardId"]
        shard.shard_id = shard_map["ShardId"]
        shard.state = shard_map["State"]
        
        shards.push(shard)
    end
    
    return shards
end

#read_data(shard_id, cursor, count) ⇒ Object



69
70
71
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 69

def read_data(shard_id, cursor, count)
    @client.read_data_from_shard_with_cursor(@project_name, @topic_name, shard_id, cursor, count)
end

#write_data(record_entities) ⇒ Object



50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/fluent/plugin/datahub/datahub-topic.rb', line 50

def write_data(record_entities)
    put_record_result = PutRecordResult.new
    result_map = @client.write_data_to_topic(@project_name, @topic_name, record_entities)

    if result_map["FailedRecordCount"] > 0
        put_record_result.failed_record_count = result_map["FailedRecordCount"]
        for i in 0...result_map["FailedRecords"].size
            result_error = result_map["FailedRecords"][i]
            put_record_result.failed_record_index.push(result_error["Index"])
            error_entity = {}
            error_entity["error_code"] = result_error["ErrorCode"]
            error_entity["error_message"] = result_error["ErrorMessage"]
            put_record_result.failed_record_error.push(error_entity)
            put_record_result.failed_record_list.push(record_entities[result_error["Index"]])
        end
    end
    return put_record_result
end