Class: Flydata::QueryBasedSync::Response
- Inherits:
-
Object
- Object
- Flydata::QueryBasedSync::Response
- Defined in:
- lib/flydata/query_based_sync/response.rb
Direct Known Subclasses
Constant Summary collapse
- DEFAULT_EMIT_CHUNK_LIMIT =
64mb
64 * (1024**2)
Instance Attribute Summary collapse
-
#context ⇒ Object
readonly
Returns the value of attribute context.
-
#new_source_pos ⇒ Object
readonly
Store the source position for the resume This needs to be saved at the per-table binlog.pos after sending the records.
-
#query_cond ⇒ Object
readonly
Returns the value of attribute query_cond.
-
#records ⇒ Object
readonly
Set records for sending as flydata sync records The format is an array of Hash objects “2”=>“Andy”, .….
-
#src_result ⇒ Object
readonly
Returns the value of attribute src_result.
-
#table_name ⇒ Object
readonly
Returns the value of attribute table_name.
Class Method Summary collapse
-
.convert_result_to_hash(result) ⇒ Object
Convert to sync record style ex) input: “name”=>“Andy” output: “2”=>“Andy”.
- .create_responses(context, table_name, raw_result, query_cond = {}) ⇒ Object
Instance Method Summary collapse
- #empty? ⇒ Boolean
-
#initialize(context, table_name, records, query_cond = {}) ⇒ Response
constructor
A new instance of Response.
- #record_count ⇒ Object
-
#set_new_source_pos(required_pk_values = false) ⇒ Object
Set per-table source position for resume.
Constructor Details
#initialize(context, table_name, records, query_cond = {}) ⇒ Response
Returns a new instance of Response.
10 11 12 13 14 15 16 17 |
# File 'lib/flydata/query_based_sync/response.rb', line 10 def initialize(context, table_name, records, query_cond = {}) @table_name = table_name @records = records @context = context @query_cond = query_cond @table_meta = context.[table_name.to_sym] end |
Instance Attribute Details
#context ⇒ Object (readonly)
Returns the value of attribute context.
19 20 21 |
# File 'lib/flydata/query_based_sync/response.rb', line 19 def context @context end |
#new_source_pos ⇒ Object (readonly)
Store the source position for the resume This needs to be saved at the per-table binlog.pos after sending the records
28 29 30 |
# File 'lib/flydata/query_based_sync/response.rb', line 28 def new_source_pos @new_source_pos end |
#query_cond ⇒ Object (readonly)
Returns the value of attribute query_cond.
19 20 21 |
# File 'lib/flydata/query_based_sync/response.rb', line 19 def query_cond @query_cond end |
#records ⇒ Object (readonly)
Set records for sending as flydata sync records The format is an array of Hash objects “2”=>“Andy”, .…
24 25 26 |
# File 'lib/flydata/query_based_sync/response.rb', line 24 def records @records end |
#src_result ⇒ Object (readonly)
Returns the value of attribute src_result.
19 20 21 |
# File 'lib/flydata/query_based_sync/response.rb', line 19 def src_result @src_result end |
#table_name ⇒ Object (readonly)
Returns the value of attribute table_name.
19 20 21 |
# File 'lib/flydata/query_based_sync/response.rb', line 19 def table_name @table_name end |
Class Method Details
.convert_result_to_hash(result) ⇒ Object
Convert to sync record style ex)
input: {"id"=>1, "name"=>"Andy"}
output: {"1"=>1, "2"=>"Andy"}
109 110 111 112 113 114 115 116 117 118 |
# File 'lib/flydata/query_based_sync/response.rb', line 109 def self.convert_result_to_hash(result) first_record = result.first return result if first_record.kind_of?(Hash) && first_record.has_key?('1') result.collect do |r| h = {} r.values.each.with_index(1){|v, i| h[i.to_s] = v } h end end |
.create_responses(context, table_name, raw_result, query_cond = {}) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/flydata/query_based_sync/response.rb', line 63 def self.create_responses(context, table_name, raw_result, query_cond = {}) emit_chunk_limit = context.params[:emit_chunk_limit] || DEFAULT_EMIT_CHUNK_LIMIT = context.[table_name.to_sym] record_size_estimator = RecordSizeEstimator.new(table_name, [:columns].count) all_records = convert_result_to_hash(raw_result) responses = [] if all_records.nil? || all_records.empty? res = self.new(context, table_name, [], query_cond) res.set_new_source_pos return [res] end start_index = 0 cur_chunk_size = 0 # Split records and create response object all_records.each_with_index do |r, i| record_size = record_size_estimator.calc_record_size(r) # Set another response object, if the chunk size exceeds the emit_chunk_limit if cur_chunk_size > 0 && (cur_chunk_size + record_size) > emit_chunk_limit responses << self.new(context, table_name, all_records[start_index..i-1], query_cond) start_index = i cur_chunk_size = record_size else cur_chunk_size += record_size end end if cur_chunk_size > 0 responses << self.new(context, table_name, all_records[start_index..-1], query_cond) end # Set new source positions for each response responses[0..-2].each{|res| res.set_new_source_pos(true)} more_response = (all_records.size == [:max_num_rows_per_query]) responses.last.set_new_source_pos(more_response) responses end |
Instance Method Details
#empty? ⇒ Boolean
59 60 61 |
# File 'lib/flydata/query_based_sync/response.rb', line 59 def empty? record_count == 0 end |
#record_count ⇒ Object
55 56 57 |
# File 'lib/flydata/query_based_sync/response.rb', line 55 def record_count records.nil? ? 0 : records.count end |
#set_new_source_pos(required_pk_values = false) ⇒ Object
Set per-table source position for resume
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/flydata/query_based_sync/response.rb', line 31 def set_new_source_pos(required_pk_values = false) pk_values = # Set pk_values if next_response exists or returned row size is the same as the specified limit size. # It is for resuming in the middle of the transaction. if required_pk_values values = [] pk_positions = @table_meta[:pk_positions] @table_meta[:primary_keys].each_with_index do |pk, index| pk_pos = pk_positions[index] values << { pk.to_s => @records.last[pk_pos.to_s] } end values end args = if pk_values [@query_cond[:from_sid], @query_cond[:to_sid], pk_values] else [@query_cond[:to_sid]] end @new_source_pos = @context.source_pos_class.new(*args) end |