Class: Fluent::BigObjectOutput::TableElement

Inherits:
Object
  • Object
show all
Includes:
Configurable
Defined in:
lib/fluent/plugin/out_bigobject.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(log, bo_hostname, bo_port, tag_format) ⇒ TableElement

Returns a new instance of TableElement.



34
35
36
37
38
39
40
41
# File 'lib/fluent/plugin/out_bigobject.rb', line 34

def initialize(log, bo_hostname, bo_port, tag_format)
  super()
  @log = log
  @bo_hostname = bo_hostname
  @bo_port = bo_port
  @bo_url="http://#{@bo_hostname}:#{@bo_port}/cmd"
  @tag_format = tag_format
end

Instance Attribute Details

#mpatternObject (readonly)

Returns the value of attribute mpattern.



32
33
34
# File 'lib/fluent/plugin/out_bigobject.rb', line 32

def mpattern
  @mpattern
end

Instance Method Details

#configure(conf) ⇒ Object



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/fluent/plugin/out_bigobject.rb', line 43

def configure(conf)
  super

  @mpattern = Fluent::MatchPattern.create(pattern)
  @mapping = (@column_mapping==nil)? nil:parse_column_mapping(@column_mapping)
  @log.info("column mapping for #{table} - #{@mapping}")
  @format_proc = Proc.new { |record|
    if (@mapping==nil)
      record
    else
      new_record = {}
      @mapping.each { |k, c|
        new_record[c] = record[k]
        }
      new_record
    end
  }
end

#getPkeyValue(value) ⇒ Object



62
63
64
65
66
67
68
# File 'lib/fluent/plugin/out_bigobject.rb', line 62

def getPkeyValue(value)
  if (@bo_primary_key_is_int)
       return value
  else
       return"\"#{value}\""
  end
end

#send(chunk) ⇒ Object

Send Data to Bigobject using Restful API



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/fluent/plugin/out_bigobject.rb', line 71

def send(chunk)
  insertStmts = Array.new
  deleteStmts = Array.new
  
  columns = nil
  chunk.msgpack_each { |tag, time, data|
     tag_parts = tag.match(@tag_format)
     target_event = tag_parts['event']
     id_key = tag_parts['primary_key']
     
     keys = Array.new
     values = Array.new
     data = @format_proc.call(data)
     data.keys.sort.each do |key|
        keys << key
        values << data[key].to_json
     end
      
     if (target_event=='insert')
        if columns.to_s.empty?
          columns = "(#{keys.join(",")})"
        end
        insertStmts.push("(#{values.join(",")})")
     elsif (target_event=='update')
       pkey=""
       updates = Array.new
       keys.zip(values) { |key, value|
           if (key==id_key)
             pkey = getPkeyValue(value)
           else
             updates.push("#{key}=#{value}")
           end 
       }
       sendStmt = "UPDATE #{table} SET #{updates.join(",")} WHERE #{id_key}=#{pkey}"
       sendBO(@bo_url, sendStmt)   
     elsif (target_event=='delete')
       keys.zip(values) { |key, value|
            if (key==id_key)
              pkey = getPkeyValue(value)
            end
            deleteStmts.push("#{id_key}=#{pkey}")
        }
     end
  }
  
  if insertStmts.length>0
    sendStmt = "INSERT INTO #{@table}  #{columns} VALUES " + insertStmts.join(",")
    sendBO(@bo_url, sendStmt)
    @log.debug("sending #{insertStmts.length} rows to bigobject for insert via Restful API")
  end 
  
  if deleteStmts.length>0
    sendStmt = "DELETE FROM #{@table} WHERE " + deleteStmts.join(" or ")
    sendBO(@bo_url, sendStmt)
    @log.debug("sending #{deleteStmts.length} rows to bigobject for delete via Restful API")
  end
end

#to_sObject



129
130
131
# File 'lib/fluent/plugin/out_bigobject.rb', line 129

def to_s
  "table:#{table}, column_mapping:#{column_mapping}, pattern:#{pattern}"
end