Class: Rhoconnect::Handler::Query::Runner

Inherits:
Object
  • Object
show all
Defined in:
lib/rhoconnect/handler/query/runner.rb

Direct Known Subclasses

PassThroughRunner

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(model, client, route_handler, params = {}) ⇒ Runner

Returns a new instance of Runner.

Raises:

  • (ArgumentError)


8
9
10
11
12
13
14
15
16
17
# File 'lib/rhoconnect/handler/query/runner.rb', line 8

def initialize(model,client,route_handler, params = {})
  raise ArgumentError.new(UNKNOWN_CLIENT) unless client
  raise ArgumentError.new(UNKNOWN_SOURCE) unless (model and model.source)
  raise ArgumentError.new('Invalid app for source') unless model.source.app

  @source,@client,@p_size = model.source,client,params[:p_size] ? params[:p_size].to_i : 500
  @client.last_sync = Time.now if @client
  @params = params
  @engine = Rhoconnect::Handler::Query::Engine.new(model, route_handler, @params)
end

Instance Attribute Details

#clientObject

Returns the value of attribute client.



6
7
8
# File 'lib/rhoconnect/handler/query/runner.rb', line 6

def client
  @client
end

#engineObject

Returns the value of attribute engine.



6
7
8
# File 'lib/rhoconnect/handler/query/runner.rb', line 6

def engine
  @engine
end

#p_sizeObject

Returns the value of attribute p_size.



6
7
8
# File 'lib/rhoconnect/handler/query/runner.rb', line 6

def p_size
  @p_size
end

#paramsObject

Returns the value of attribute params.



6
7
8
# File 'lib/rhoconnect/handler/query/runner.rb', line 6

def params
  @params
end

#sourceObject

Returns the value of attribute source.



6
7
8
# File 'lib/rhoconnect/handler/query/runner.rb', line 6

def source
  @source
end

Instance Method Details

#ack_token(token) ⇒ Object



51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/rhoconnect/handler/query/runner.rb', line 51

def ack_token(token)
  stored_token = @client.get_value(:page_token)
  if stored_token 
    if token and stored_token == token
      @client.put_value(:page_token,nil)
      @client.flush_data(:schema_page)
      @client.flush_data(:metadata_page)
      @client.flush_data(:create_links_page)
      @client.flush_data(:page)
      @client.flush_data(:delete_page)
      _delete_errors_page
      return true
    else
        if token == nil
            # client lost state - may be previous connection was failed etc.
            # we ned reinit connection
            puts "$$$ ERROR: receive NIL token => reset saved session connection. client["+@client.to_s+"] stored token["+stored_token.to_s+"]"
            @client.put_value(:page_token,nil)
            _delete_errors_page
            return true
        end
    end
  else
    return true    
  end    
  false
end

#build_page {|res| ... } ⇒ Object

Yields:

  • (res)


91
92
93
94
95
96
97
# File 'lib/rhoconnect/handler/query/runner.rb', line 91

def build_page
  res = {}
  yield res
  res.reject! {|key,value| value.nil? or value.empty?}
  res.merge!(_send_errors)
  res
end

#compute_errors_pageObject

Computes errors for client and stores a copy as errors page



232
233
234
235
236
237
238
239
240
241
# File 'lib/rhoconnect/handler/query/runner.rb', line 232

def compute_errors_page
  ['create','update','delete'].each do |operation|
    @client.lock("#{operation}_errors") do |c| 
      c.rename("#{operation}_errors","#{operation}_errors_page")
    end
  end
  @client.lock("update_rollback") do |c|
    c.rename("update_rollback","update_rollback_page")
  end
end

Computes create links for a client and stores a copy as links page



244
245
246
247
248
249
# File 'lib/rhoconnect/handler/query/runner.rb', line 244

def compute_links_page
  @client.lock(:create_links) do |c| 
    c.rename(:create_links,:create_links_page)
    c.get_data(:create_links_page)
  end
end

#compute_metadataObject

Computes the metadata sha1 and returns metadata if client’s sha1 doesn’t match source’s sha1



171
172
173
174
175
176
177
178
179
# File 'lib/rhoconnect/handler/query/runner.rb', line 171

def 
  , = @source.lock(:metadata) do |s|
    [s.get_value(:metadata_sha1),s.get_value(:metadata)]
  end
  return if @client.get_value(:metadata_sha1) == 
  @client.put_value(:metadata_sha1,)
  @client.put_value(:metadata_page,)
  
end

#compute_pageObject

Computes diffs between master doc and client doc, trims it to page size, stores page, and returns page as hash



184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/rhoconnect/handler/query/runner.rb', line 184

def compute_page
  inserts_elements_map,deletes_elements_map,total_count = @source.lock(:md) do |s| 
    inserts_elements_map = @client.get_diff_data(:cd,s.docname(:md),@p_size)
    total_count = s.get_value(:md_size).to_i
    deletes_elements_map = s.get_diff_data(:md,@client.docname(:cd),@p_size)
    [inserts_elements_map,deletes_elements_map,total_count]
  end
  # until sync is not done - set cd_size to 0
  # once there are no changes, then, set cd_size to md_size
  cd_size = inserts_elements_map.size > 0 ? 0 : total_count
  @client.put_value(:cd_size, cd_size)
  
  # now, find the exact changes
  inserts,deletes = Store.get_inserts_deletes(inserts_elements_map,deletes_elements_map)
  
  @client.put_data(:page,inserts)
  @client.put_data(:delete_page,deletes,true)
  @client.put_value(:total_count_page,total_count)
  @client.update_elements(:cd,inserts_elements_map,deletes_elements_map)
  
  [total_count,inserts,deletes]
end

#compute_page_bruteforceObject

Computes diffs between master doc and client doc, trims it to page size, stores page, and returns page as hash



209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
# File 'lib/rhoconnect/handler/query/runner.rb', line 209

def compute_page_bruteforce
  inserts_elements_map,deletes_elements_map,total_count = @source.lock(:md) do |s| 
    inserts_elements_map,deletes_elements_map = @client.get_diff_data_bruteforce(:cd,s.docname(:md),@p_size)
    total_count = s.get_value(:md_size).to_i
    [inserts_elements_map,deletes_elements_map,total_count]
  end
  # until sync is not done - set cd_size to 0
  # once there are no changes, then, set cd_size to md_size
  cd_size = inserts_elements_map.size > 0 ? 0 : total_count
  @client.put_value(:cd_size, cd_size)
  
  # now, find the exact changes
  inserts,deletes = Store.get_inserts_deletes(inserts_elements_map,deletes_elements_map)
  
  @client.put_data(:page,inserts)
  @client.put_data(:delete_page,deletes,true)
  @client.put_value(:total_count_page,total_count)
  @client.update_elements(:cd,inserts_elements_map,deletes_elements_map)
  
  [total_count,inserts,deletes]
end

#format_result(token, progress_count, total_count, res) ⇒ Object



79
80
81
82
83
84
85
86
87
88
89
# File 'lib/rhoconnect/handler/query/runner.rb', line 79

def format_result(token,progress_count,total_count,res)
  count = 0
  count += res['insert'].length if res['insert']
  count += res['delete'].length if res['delete']
  [ {'version'=>Rhoconnect::SYNC_VERSION},
    {'token'=>(token ? token : '')},
    {'count'=>count},
    {'progress_count'=>progress_count},
    {'total_count'=>total_count},
    res ]
end

#resend_page(token = nil) ⇒ Object

Resend token for a client, also sends exceptions



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/rhoconnect/handler/query/runner.rb', line 32

def resend_page(token=nil)
  token,progress_count,total_count,res = '',0,0,{}
  schema_page = @client.get_value(:schema_page)
  if schema_page
    res = {'schema-changed' => 'true'}
  else  
    res = build_page do |r|
      r['insert'] = @client.get_data(:page)
      r['delete'] = @client.get_data(:delete_page)
      r['links'] = @client.get_data(:create_links_page)
      r['metadata'] = @client.get_value(:metadata_page)
      progress_count = 0
      total_count = @client.get_value(:total_count_page).to_i
    end
  end
  token = @client.get_value(:page_token)
  [token,progress_count,total_count,res]
end

#runObject



19
20
21
22
23
24
25
26
27
28
29
# File 'lib/rhoconnect/handler/query/runner.rb', line 19

def run
  res = []
  token = params[:token]
  if not ack_token(token)
    res = resend_page(token)
  else
    query_result = @engine.do_sync
    res = send_new_page
  end
  format_result(res[0],res[1],res[2],res[3])
end

#schema_changed?Boolean

Checks if schema changed

Returns:

  • (Boolean)


152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
# File 'lib/rhoconnect/handler/query/runner.rb', line 152

def schema_changed?
  if engine.model.respond_to?(:schema)
    schema_sha1 = @source.get_value(:schema_sha1)
    if @client.get_value(:schema_sha1).nil?
      @client.put_value(:schema_sha1,schema_sha1)
      return false
    elsif @client.get_value(:schema_sha1) == schema_sha1
      return false
    end
    @client.put_value(:schema_sha1,schema_sha1)
    @client.put_value(:schema_page,schema_sha1)
    return true
  else
    return false
  end
end

#send_new_pageObject



99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/rhoconnect/handler/query/runner.rb', line 99

def send_new_page
  token,progress_count,total_count,res = '',0,0,{}
  if schema_changed?
    _expire_bulk_data
    token = @client.compute_token(:page_token)
    res = {'schema-changed' => 'true'}
  else  
    compute_errors_page
    res = build_page do |r|
      total_count,r['insert'],r['delete'] = compute_page
      r['links'] = compute_links_page
      r['metadata'] = 
    end
    if res['insert'] or res['delete'] or res['links']
      token = @client.compute_token(:page_token)
    else
      _delete_errors_page 
    end
  end
  # TODO: progress count can not be computed properly
  # without comparing what has actually changes
  # so we need to obsolete it in the future versions
  progress_count = 0
  [token,progress_count,total_count,res]
end

#send_new_page_bruteforceObject



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
# File 'lib/rhoconnect/handler/query/runner.rb', line 125

def send_new_page_bruteforce
  token,progress_count,total_count,res = '',0,0,{}
  if schema_changed?
    _expire_bulk_data
    token = @client.compute_token(:page_token)
    res = {'schema-changed' => 'true'}
  else  
    compute_errors_page
    res = build_page do |r|
      total_count,r['insert'],r['delete'] = compute_page_bruteforce
      r['links'] = compute_links_page
      r['metadata'] = 
    end
    if res['insert'] or res['delete'] or res['links']
      token = @client.compute_token(:page_token)
    else
      _delete_errors_page 
    end
  end
  # TODO: progress count can not be computed properly
  # without comparing what has actually changes
  # so we need to obsolete it in the future versions
  progress_count = 0
  [token,progress_count,total_count,res]
end