Module: Rhoconnect::BulkDataJob

Defined in:
lib/rhoconnect/jobs/bulk_data_job.rb

Class Method Summary collapse

Class Method Details

.after_perform_x(*args) ⇒ Object



21
22
23
24
25
26
27
28
29
# File 'lib/rhoconnect/jobs/bulk_data_job.rb', line 21

def self.after_perform_x(*args)
  log "BulkDataJob.after_perform_x hook called ..."
  params = args[0] # 1st parameter is bulk data
  do_bulk_job(params) do |bulk_data|
    bulk_data.state = :completed
    bulk_data.refresh_time = Time.now.to_i + Rhoconnect.bulk_sync_poll_interval
  end
  log "BulkDataJob.after_perform_x hook set data state to complete."
end

.compress(archive, file) ⇒ Object



197
198
199
200
201
# File 'lib/rhoconnect/jobs/bulk_data_job.rb', line 197

def self.compress(archive,file)
  Zip::File.open(archive, 'w') do |zipfile|
    zipfile.add(URI.escape(File.basename(file)),file)
  end
end

.create_sqlite_data_file(bulk_data, ts) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# File 'lib/rhoconnect/jobs/bulk_data_job.rb', line 148

def self.create_sqlite_data_file(bulk_data,ts)
  sources_refs = {}
  schema,index,bulk_data.dbfile = get_file_args(bulk_data.name,ts)
  FileUtils.mkdir_p(File.dirname(bulk_data.dbfile))
  # TODO: remove old bulk files!
  # FileUtils.rm Dir.glob(File.join(Rhoconnect.data_directory, "#{bulk_data.name}*"))

  db = DBAdapter.instance.get_connection(bulk_data.dbfile)
  db.execute_batch(File.open(schema,'r').read)

  src_counter = 1
  selected_sources = {}
  bulk_data.sources[0, -1].each do |source|
    selected_sources[source] = true
  end
  bulk_data.partition_sources[0, -1].each do |source_name|
    timer = start_timer("start importing sqlite data for #{source_name}")
    source = Source.load(source_name,{:app_id => bulk_data.app_id,
      :user_id => bulk_data.user_id})
    source.source_id = src_counter
    src_counter += 1
    source_attrib_refs = nil
    is_selected_source = selected_sources.has_key?(source_name)
    if source.schema
      source_attrib_refs = import_data_to_fixed_schema(db,source,is_selected_source)
    else
      source_attrib_refs = import_data_to_object_values(db,source,is_selected_source)
    end
    sources_refs[source_name] =
      {:source => source, :refs => source_attrib_refs, :skip_source => !is_selected_source}
    lap_timer("finished importing sqlite data for #{source_name}",timer)
  end
  populate_sources_table(db,sources_refs)

  db.execute_batch(File.open(index,'r').read)
  db.execute_batch("VACUUM;");
  db.close

  compress("#{bulk_data.dbfile}.rzip",bulk_data.dbfile)
  gzip_compress("#{bulk_data.dbfile}.gzip",bulk_data.dbfile)
end

.do_bulk_job(params) ⇒ Object



31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/rhoconnect/jobs/bulk_data_job.rb', line 31

def self.do_bulk_job(params)
  bulk_data = nil
  begin
    bulk_data = BulkData.load(params["data_name"]) if BulkData.is_exist?(params["data_name"])
    if bulk_data
      yield bulk_data
    else
      raise Exception.new("No bulk data found for #{params["data_name"]}")
    end
  rescue Exception => e
    bulk_data.delete if bulk_data
    log "Bulk data job raised: #{e.message}"
    log e.backtrace.join("\n")
    raise e
  end
end

.get_file_args(bulk_data_name, ts) ⇒ Object



190
191
192
193
194
195
# File 'lib/rhoconnect/jobs/bulk_data_job.rb', line 190

def self.get_file_args(bulk_data_name,ts)
  schema = BulkData.schema_file
  index = BulkData.index_file
  dbfile = File.join(Rhoconnect.data_directory,bulk_data_name+'_'+ts+'.data')
  [schema,index,dbfile]
end

.gzip_compress(archive, file) ⇒ Object



203
204
205
206
207
208
209
210
211
# File 'lib/rhoconnect/jobs/bulk_data_job.rb', line 203

def self.gzip_compress(archive,file)
  data = File.new(file, "rb")
  File.open(archive, 'wb') do |f|
    gz = Zlib::GzipWriter.new(f)
    gz.write data.read
    gz.close
  end
  data.close
end

.import_data_to_fixed_schema(db, source, is_selected = true) ⇒ Object

Loads data into fixed schema table based on source settings



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# File 'lib/rhoconnect/jobs/bulk_data_job.rb', line 67

def self.import_data_to_fixed_schema(db,source,is_selected=true)
  data = {}
  data = source.get_data(:md) if is_selected
  # counter = {}
  columns,qm = [],[]
  create_table = ["\"object\" varchar(255) PRIMARY KEY"]
  schema = JSON.parse(source.schema)

  db.transaction do |database|
    # Create a table with columns specified by 'property' array in settings
    schema['property'].each do |key,value|
      create_table << "\"#{key}\" varchar default NULL"
      columns << key
      qm << '?'
    end
    database.execute("CREATE TABLE #{source.name}(#{create_table.join(",")} );")

    database.prepare("insert into #{source.name} (object,#{columns.join(',')}) values (?,#{qm.join(',')})") do |stmt|
      data.each do |obj,row|
        args = [obj]
        columns.each { |col| args << row[col] }
        # The * is used to expand an array into individual arguments for 'execute' method.
        # JRuby (1.6.0) won't work without asterisk, but other rubies doing well!
        stmt.execute(*args)
      end
    end

    # Create indexes for specified columns in settings 'index'
    schema['index'].each do |key,value|
      val2 = ""
      value.split(',').each do |col|
        val2 += ',' if val2.length > 0
        val2 += "\"#{col}\""
      end

      database.execute("CREATE INDEX #{key} on #{source.name} (#{val2});")
    end if schema['index']

    # Create unique indexes for specified columns in settings 'unique_index'
    schema['unique_index'].each do |key,value|
      val2 = ""
      value.split(',').each do |col|
        val2 += ',' if val2.length > 0
        val2 += "\"#{col}\""
      end

      database.execute("CREATE UNIQUE INDEX #{key} on #{source.name} (#{val2});")
    end if schema['unique_index']
  end

  return {}
end

.import_data_to_object_values(db, source, is_selected = true) ⇒ Object



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# File 'lib/rhoconnect/jobs/bulk_data_job.rb', line 48

def self.import_data_to_object_values(db,source,is_selected=true)
  data = {}
  data = source.get_data(:md) if is_selected
  counter = {}
  db.transaction do |database|
    database.prepare("insert into object_values
      (source_id,attrib,object,value) values (?,?,?,?)") do |stmt|
      data.each do |object_id,object|
        object.each do |attrib,value|
          counter[attrib] = counter[attrib] ? counter[attrib] + 1 : 1
          stmt.execute(source.source_id.to_i,attrib,object_id,value)
        end
      end
    end
  end
  counter
end

.perform(params) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
# File 'lib/rhoconnect/jobs/bulk_data_job.rb', line 8

def self.perform(params)
  do_bulk_job(params) do |bulk_data|
    timer = start_timer('starting bulk data process')
    bulk_data.process_sources
    timer = lap_timer('process_sources',timer)
    ts = Time.now.to_i.to_s
    create_sqlite_data_file(bulk_data,ts)
    timer = lap_timer('create_sqlite_data_file',timer)
    log "bulk_data.dbfile:  #{bulk_data.dbfile}"
    log "finished bulk data process"
  end
end

.populate_sources_table(db, sources_refs) ⇒ Object

#2354: Bulk sync not updating sources table last_inserted_size + last_deleted_size backend_refresh_time +



132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/rhoconnect/jobs/bulk_data_job.rb', line 132

def self.populate_sources_table(db,sources_refs)
  db.transaction do |database|
    database.prepare("insert into sources
      (source_id,name,sync_priority,partition,sync_type,source_attribs,metadata,schema,blob_attribs,associations,last_inserted_size,backend_refresh_time)
      values (?,?,?,?,?,?,?,?,?,?,?,?)") do |stmt|
      sources_refs.each do |source_name,ref|
        s = ref[:source]
        # skipped sources should be marked with sync type :none
        sync_type = ref[:skip_source] ? 'none' : s.sync_type.to_s
        stmt.execute(s.source_id,s.name,s.priority,s.partition_type.to_s,
          sync_type,refs_to_s(ref[:refs]),s.get_value(:metadata),s.schema,s.blob_attribs,s.has_many,s.get_value(:md_size).to_i,s.read_state.refresh_time)
      end
    end
  end
end

.refs_to_s(refs) ⇒ Object



120
121
122
123
124
125
126
# File 'lib/rhoconnect/jobs/bulk_data_job.rb', line 120

def self.refs_to_s(refs)
  str = ''
  refs.sort.each do |name,value|
    str << "#{name},#{value},"
  end
  str[0..-2]
end