Module: Flydata::PluginSupport::SyncRecordEmittable
- Defined in:
- lib/flydata/plugin_support/sync_record_emittable.rb
Constant Summary collapse
- TABLE_NAME =
A Flydata JSON tag to specify a table name
:table_name
- TYPE =
:type
- SEQ =
:seq
- RESPECT_ORDER =
:respect_order
- SRC_POS =
:src_pos
- TABLE_REV =
:table_rev
- V =
FlyData record format version
:v
Instance Attribute Summary collapse
-
#context ⇒ Object
required.
Instance Method Summary collapse
-
#emit_sync_records(records, options) ⇒ Object
Public Interface: Emit sync records to fluent engine.
Instance Attribute Details
#context ⇒ Object
required
14 15 16 |
# File 'lib/flydata/plugin_support/sync_record_emittable.rb', line 14 def context @context end |
Instance Method Details
#emit_sync_records(records, options) ⇒ Object
Public Interface: Emit sync records to fluent engine
"records" : A record or records for emitting
Each record needs to be Hash
"options"
type: : (required) type (insert, update, delete)
tag : (optional) tag (default: @context.tag)
timestamp : (optional) timestamp (default: current timestamp)
src_pos : (required) source position (used for sync:repair)
table : (optional) table name
increment_table_rev : (optional) set true when incrementing table revision
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/flydata/plugin_support/sync_record_emittable.rb', line 27 def emit_sync_records(records, ) return if records.nil? || records.empty? # skip records = [records] unless records.kind_of?(Array) # Check options tag = [:tag] || @context.tag = [:timestamp] || Time.now.to_i type = [:type] raise "type option must be set" if type.to_s.empty? src_pos = [:src_pos] raise "src_pos option must be set" if src_pos.to_s.empty? seq = nil if table = [:table] table_rev = @context.table_revs[table] if [:increment_table_rev] table_rev = @context.sync_fm.increment_table_rev(table, table_rev) @context.table_revs[table] = table_rev end seq = @context.sync_fm.get_table_position(table) end # Add common information to each record array = records.collect do |r| r[TYPE] = type r[RESPECT_ORDER] = true r[SRC_POS] = src_pos r[V] = FlydataCore::Record::V2 if table seq = @context.sync_fm.increment_table_position(seq) r[SEQ] = seq r[TABLE_NAME] = table r[TABLE_REV] = table_rev end [, r] end Fluent::Engine.emit_array(tag, array) if table @context.sync_fm.save_table_position(table, seq) if [:set_infinity_to_table_binlog_pos] @context.set_infinity_to_table_binlog_pos(table) end end end |