Class: TaxGenerator::Processor

Inherits:
Object
  • Object
show all
Includes:
Celluloid, Celluloid::Logger, Celluloid::Notifications, ApplicationHelper
Defined in:
lib/tax_generator/classes/processor.rb

Overview

class used to process xml files and create html files

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from ApplicationHelper

create_directories, elements_with_content, erb_template, execute_with_rescue, format_error, log_error, log_message, nokogiri_xml, rescue_interrupt, root, set_celluloid_exception_handling

Constructor Details

#initialize(options = {}) ⇒ void

receives a list of options that are used to determine the input files and output and input folders

Parameters:

  • options (Hash) (defaults to: {})

    the options that can determine the input and output files and folders

Options Hash (options):

  • :input_dir (String)

    The input directory

  • :output_dir (String)

    The output directory

  • :taxonomy_file_name (String)

    The taxonomy file name

  • :destinations_file_name (String)

    The destinations file name

See Also:



43
44
45
46
47
48
49
50
51
52
# File 'lib/tax_generator/classes/processor.rb', line 43

def initialize(options = {})
  Celluloid.boot
  @options = options.is_a?(Hash) ? options.symbolize_keys : {}
  @worker_supervisor = Celluloid::SupervisionGroup.run!
  @workers = @worker_supervisor.pool(TaxGenerator::FileCreator, as: :workers, size: 50)
  Actor.current.link @workers
  @jobs = {}
  @job_to_worker = {}
  @worker_to_job = {}
end

Instance Attribute Details

#job_to_workerHash

Returns each key from the list is the job id, and the value is the worker that will handle the job.

Returns:

  • (Hash)

    each key from the list is the job id, and the value is the worker that will handle the job



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
  include Celluloid
  include Celluloid::Logger
  include Celluloid::Notifications
  include TaxGenerator::ApplicationHelper

  attr_reader :options, :worker_supervisor, :workers, :taxonomy, :jobs, :job_to_worker, :worker_to_job

  trap_exit :worker_died

  #  receives a list of options that are used to determine the input files and output and input folders
  #
  # @param  [Hash]  options the options that can determine the input and output files and folders
  # @option options [String] :input_dir The input directory
  # @option options [String]:output_dir The output directory
  # @option options [String] :taxonomy_file_name The taxonomy file name
  # @option options [String] :destinations_file_name The destinations file name
  #
  # @see #work
  #
  # @return [void]
  #
  # @api public
  def initialize(options = {})
    Celluloid.boot
    @options = options.is_a?(Hash) ? options.symbolize_keys : {}
    @worker_supervisor = Celluloid::SupervisionGroup.run!
    @workers = @worker_supervisor.pool(TaxGenerator::FileCreator, as: :workers, size: 50)
    Actor.current.link @workers
    @jobs = {}
    @job_to_worker = {}
    @worker_to_job = {}
  end

  #  returns the input folder from the options list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs.all? { |_job_id, job| job['status'] == 'finished' }
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.pmap do |job|
      job = job.stringify_keys
      @jobs[job['atlas_id']] = job
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    current_actor = Actor.current
    @jobs.pmap do |_job_id, job|
      @workers.async.work(job, current_actor) if @workers.alive?
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = [{ atlas_id: 0, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }]
    nokogiri_xml(destinations_file_path).xpath('//destination').pmap do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
    terminate
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    @job_to_worker[job['atlas_id']] = worker
    @worker_to_job[worker.mailbox.address] = job
    log_message("worker #{worker.job_id} registed into manager")
    Actor.current.link worker
    worker.async.start_work
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    mailbox_address = worker.mailbox.address
    job = @worker_to_job.delete(mailbox_address)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with mailbox #{mailbox_address.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#jobsHash

Returns each key from the job list is the job id, and the value is the job itself.

Returns:

  • (Hash)

    each key from the job list is the job id, and the value is the job itself



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
  include Celluloid
  include Celluloid::Logger
  include Celluloid::Notifications
  include TaxGenerator::ApplicationHelper

  attr_reader :options, :worker_supervisor, :workers, :taxonomy, :jobs, :job_to_worker, :worker_to_job

  trap_exit :worker_died

  #  receives a list of options that are used to determine the input files and output and input folders
  #
  # @param  [Hash]  options the options that can determine the input and output files and folders
  # @option options [String] :input_dir The input directory
  # @option options [String]:output_dir The output directory
  # @option options [String] :taxonomy_file_name The taxonomy file name
  # @option options [String] :destinations_file_name The destinations file name
  #
  # @see #work
  #
  # @return [void]
  #
  # @api public
  def initialize(options = {})
    Celluloid.boot
    @options = options.is_a?(Hash) ? options.symbolize_keys : {}
    @worker_supervisor = Celluloid::SupervisionGroup.run!
    @workers = @worker_supervisor.pool(TaxGenerator::FileCreator, as: :workers, size: 50)
    Actor.current.link @workers
    @jobs = {}
    @job_to_worker = {}
    @worker_to_job = {}
  end

  #  returns the input folder from the options list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs.all? { |_job_id, job| job['status'] == 'finished' }
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.pmap do |job|
      job = job.stringify_keys
      @jobs[job['atlas_id']] = job
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    current_actor = Actor.current
    @jobs.pmap do |_job_id, job|
      @workers.async.work(job, current_actor) if @workers.alive?
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = [{ atlas_id: 0, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }]
    nokogiri_xml(destinations_file_path).xpath('//destination').pmap do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
    terminate
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    @job_to_worker[job['atlas_id']] = worker
    @worker_to_job[worker.mailbox.address] = job
    log_message("worker #{worker.job_id} registed into manager")
    Actor.current.link worker
    worker.async.start_work
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    mailbox_address = worker.mailbox.address
    job = @worker_to_job.delete(mailbox_address)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with mailbox #{mailbox_address.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#optionsHash

Returns the options that can determine the input and output files and folders.

Returns:

  • (Hash)

    the options that can determine the input and output files and folders



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
  include Celluloid
  include Celluloid::Logger
  include Celluloid::Notifications
  include TaxGenerator::ApplicationHelper

  attr_reader :options, :worker_supervisor, :workers, :taxonomy, :jobs, :job_to_worker, :worker_to_job

  trap_exit :worker_died

  #  receives a list of options that are used to determine the input files and output and input folders
  #
  # @param  [Hash]  options the options that can determine the input and output files and folders
  # @option options [String] :input_dir The input directory
  # @option options [String]:output_dir The output directory
  # @option options [String] :taxonomy_file_name The taxonomy file name
  # @option options [String] :destinations_file_name The destinations file name
  #
  # @see #work
  #
  # @return [void]
  #
  # @api public
  def initialize(options = {})
    Celluloid.boot
    @options = options.is_a?(Hash) ? options.symbolize_keys : {}
    @worker_supervisor = Celluloid::SupervisionGroup.run!
    @workers = @worker_supervisor.pool(TaxGenerator::FileCreator, as: :workers, size: 50)
    Actor.current.link @workers
    @jobs = {}
    @job_to_worker = {}
    @worker_to_job = {}
  end

  #  returns the input folder from the options list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs.all? { |_job_id, job| job['status'] == 'finished' }
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.pmap do |job|
      job = job.stringify_keys
      @jobs[job['atlas_id']] = job
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    current_actor = Actor.current
    @jobs.pmap do |_job_id, job|
      @workers.async.work(job, current_actor) if @workers.alive?
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = [{ atlas_id: 0, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }]
    nokogiri_xml(destinations_file_path).xpath('//destination').pmap do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
    terminate
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    @job_to_worker[job['atlas_id']] = worker
    @worker_to_job[worker.mailbox.address] = job
    log_message("worker #{worker.job_id} registed into manager")
    Actor.current.link worker
    worker.async.start_work
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    mailbox_address = worker.mailbox.address
    job = @worker_to_job.delete(mailbox_address)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with mailbox #{mailbox_address.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#taxonomyTaxGenerator::TaxonomyTree

Returns the taxonomy tree that holds the nodes from the taxonomy xml document.

Returns:



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
  include Celluloid
  include Celluloid::Logger
  include Celluloid::Notifications
  include TaxGenerator::ApplicationHelper

  attr_reader :options, :worker_supervisor, :workers, :taxonomy, :jobs, :job_to_worker, :worker_to_job

  trap_exit :worker_died

  #  receives a list of options that are used to determine the input files and output and input folders
  #
  # @param  [Hash]  options the options that can determine the input and output files and folders
  # @option options [String] :input_dir The input directory
  # @option options [String]:output_dir The output directory
  # @option options [String] :taxonomy_file_name The taxonomy file name
  # @option options [String] :destinations_file_name The destinations file name
  #
  # @see #work
  #
  # @return [void]
  #
  # @api public
  def initialize(options = {})
    Celluloid.boot
    @options = options.is_a?(Hash) ? options.symbolize_keys : {}
    @worker_supervisor = Celluloid::SupervisionGroup.run!
    @workers = @worker_supervisor.pool(TaxGenerator::FileCreator, as: :workers, size: 50)
    Actor.current.link @workers
    @jobs = {}
    @job_to_worker = {}
    @worker_to_job = {}
  end

  #  returns the input folder from the options list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs.all? { |_job_id, job| job['status'] == 'finished' }
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.pmap do |job|
      job = job.stringify_keys
      @jobs[job['atlas_id']] = job
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    current_actor = Actor.current
    @jobs.pmap do |_job_id, job|
      @workers.async.work(job, current_actor) if @workers.alive?
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = [{ atlas_id: 0, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }]
    nokogiri_xml(destinations_file_path).xpath('//destination').pmap do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
    terminate
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    @job_to_worker[job['atlas_id']] = worker
    @worker_to_job[worker.mailbox.address] = job
    log_message("worker #{worker.job_id} registed into manager")
    Actor.current.link worker
    worker.async.start_work
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    mailbox_address = worker.mailbox.address
    job = @worker_to_job.delete(mailbox_address)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with mailbox #{mailbox_address.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#worker_supervisorCelluloid::SupervisionGroup

Returns the supervision group that supervises workers.

Returns:

  • (Celluloid::SupervisionGroup)

    the supervision group that supervises workers



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
  include Celluloid
  include Celluloid::Logger
  include Celluloid::Notifications
  include TaxGenerator::ApplicationHelper

  attr_reader :options, :worker_supervisor, :workers, :taxonomy, :jobs, :job_to_worker, :worker_to_job

  trap_exit :worker_died

  #  receives a list of options that are used to determine the input files and output and input folders
  #
  # @param  [Hash]  options the options that can determine the input and output files and folders
  # @option options [String] :input_dir The input directory
  # @option options [String]:output_dir The output directory
  # @option options [String] :taxonomy_file_name The taxonomy file name
  # @option options [String] :destinations_file_name The destinations file name
  #
  # @see #work
  #
  # @return [void]
  #
  # @api public
  def initialize(options = {})
    Celluloid.boot
    @options = options.is_a?(Hash) ? options.symbolize_keys : {}
    @worker_supervisor = Celluloid::SupervisionGroup.run!
    @workers = @worker_supervisor.pool(TaxGenerator::FileCreator, as: :workers, size: 50)
    Actor.current.link @workers
    @jobs = {}
    @job_to_worker = {}
    @worker_to_job = {}
  end

  #  returns the input folder from the options list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs.all? { |_job_id, job| job['status'] == 'finished' }
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.pmap do |job|
      job = job.stringify_keys
      @jobs[job['atlas_id']] = job
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    current_actor = Actor.current
    @jobs.pmap do |_job_id, job|
      @workers.async.work(job, current_actor) if @workers.alive?
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = [{ atlas_id: 0, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }]
    nokogiri_xml(destinations_file_path).xpath('//destination').pmap do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
    terminate
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    @job_to_worker[job['atlas_id']] = worker
    @worker_to_job[worker.mailbox.address] = job
    log_message("worker #{worker.job_id} registed into manager")
    Actor.current.link worker
    worker.async.start_work
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    mailbox_address = worker.mailbox.address
    job = @worker_to_job.delete(mailbox_address)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with mailbox #{mailbox_address.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#worker_to_jobHash

Returns each key from the list is the workers mailbox address, and the value is the job being handled by the worker.

Returns:

  • (Hash)

    each key from the list is the workers mailbox address, and the value is the job being handled by the worker



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
  include Celluloid
  include Celluloid::Logger
  include Celluloid::Notifications
  include TaxGenerator::ApplicationHelper

  attr_reader :options, :worker_supervisor, :workers, :taxonomy, :jobs, :job_to_worker, :worker_to_job

  trap_exit :worker_died

  #  receives a list of options that are used to determine the input files and output and input folders
  #
  # @param  [Hash]  options the options that can determine the input and output files and folders
  # @option options [String] :input_dir The input directory
  # @option options [String]:output_dir The output directory
  # @option options [String] :taxonomy_file_name The taxonomy file name
  # @option options [String] :destinations_file_name The destinations file name
  #
  # @see #work
  #
  # @return [void]
  #
  # @api public
  def initialize(options = {})
    Celluloid.boot
    @options = options.is_a?(Hash) ? options.symbolize_keys : {}
    @worker_supervisor = Celluloid::SupervisionGroup.run!
    @workers = @worker_supervisor.pool(TaxGenerator::FileCreator, as: :workers, size: 50)
    Actor.current.link @workers
    @jobs = {}
    @job_to_worker = {}
    @worker_to_job = {}
  end

  #  returns the input folder from the options list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs.all? { |_job_id, job| job['status'] == 'finished' }
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.pmap do |job|
      job = job.stringify_keys
      @jobs[job['atlas_id']] = job
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    current_actor = Actor.current
    @jobs.pmap do |_job_id, job|
      @workers.async.work(job, current_actor) if @workers.alive?
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = [{ atlas_id: 0, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }]
    nokogiri_xml(destinations_file_path).xpath('//destination').pmap do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
    terminate
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    @job_to_worker[job['atlas_id']] = worker
    @worker_to_job[worker.mailbox.address] = job
    log_message("worker #{worker.job_id} registed into manager")
    Actor.current.link worker
    worker.async.start_work
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    mailbox_address = worker.mailbox.address
    job = @worker_to_job.delete(mailbox_address)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with mailbox #{mailbox_address.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

#workersCelluloid::Actor

Returns the actors that will work on the jobs.

Returns:

  • (Celluloid::Actor)

    the actors that will work on the jobs



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/tax_generator/classes/processor.rb', line 20

class Processor
  include Celluloid
  include Celluloid::Logger
  include Celluloid::Notifications
  include TaxGenerator::ApplicationHelper

  attr_reader :options, :worker_supervisor, :workers, :taxonomy, :jobs, :job_to_worker, :worker_to_job

  trap_exit :worker_died

  #  receives a list of options that are used to determine the input files and output and input folders
  #
  # @param  [Hash]  options the options that can determine the input and output files and folders
  # @option options [String] :input_dir The input directory
  # @option options [String]:output_dir The output directory
  # @option options [String] :taxonomy_file_name The taxonomy file name
  # @option options [String] :destinations_file_name The destinations file name
  #
  # @see #work
  #
  # @return [void]
  #
  # @api public
  def initialize(options = {})
    Celluloid.boot
    @options = options.is_a?(Hash) ? options.symbolize_keys : {}
    @worker_supervisor = Celluloid::SupervisionGroup.run!
    @workers = @worker_supervisor.pool(TaxGenerator::FileCreator, as: :workers, size: 50)
    Actor.current.link @workers
    @jobs = {}
    @job_to_worker = {}
    @worker_to_job = {}
  end

  #  returns the input folder from the options list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def input_folder
    @options.fetch(:input_dir, "#{root}/data/input")
  end

  #  returns the taxonomy filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_name
    @options.fetch(:taxonomy_filename, 'taxonomy.xml')
  end

  #  returns the destinations filename from the option list
  # otherwise the default filename
  #
  # @return [String]
  #
  # @api public
  def destinations_file_name
    @options.fetch(:destinations_filename, 'destinations.xml')
  end

  #  returns the output folder path from the option list
  # otherwise the default path
  #
  # @return [String]
  #
  # @api public
  def output_folder
    @options.fetch(:output_dir, "#{root}/data/output")
  end

  #  returns the full path to the taxonomy file
  #
  # @return [String]
  #
  # @api public
  def taxonomy_file_path
    File.join(input_folder, taxonomy_file_name)
  end

  #  returns the full path to the destinations file
  #
  # @return [String]
  #
  # @api public
  def destinations_file_path
    File.join(input_folder, destinations_file_name)
  end

  #  returns the full path to the static folder
  #
  # @return [String]
  #
  # @api public
  def static_output_dir
    File.join(output_folder, 'static')
  end

  # cleans the output folder and re-creates it and the static folder
  #
  # @return [void]
  #
  # @api public
  def prepare_output_dirs
    FileUtils.rm_rf Dir["#{output_folder}/**/*"]
    create_directories(output_folder, static_output_dir)
    FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
  end

  #  checks if all workers finished and returns true or false
  #
  # @return [Boolean]
  #
  # @api public
  def all_workers_finished?
    @jobs.all? { |_job_id, job| job['status'] == 'finished' }
  end

  #  registers all the jobs so that the managers can have access to them at any time
  #
  # @param  [Array] jobs the jobs that will be registered
  #
  # @return [void]
  #
  # @api public
  def register_jobs(*jobs)
    jobs.pmap do |job|
      job = job.stringify_keys
      @jobs[job['atlas_id']] = job
    end
  end

  #  registers all the jobs, and then delegates them to workers
  # @see #register_jobs
  # @see TaxGenerator::FileCreator#work
  #
  # @param  [Array] jobs the jobs that will be delegated to the workers
  #
  # @return [void]
  #
  # @api public
  def delegate_job(*jobs)
    # jobs need to be added into the manager before starting task to avoid adding new key while iterating
    register_jobs(*jobs)
    current_actor = Actor.current
    @jobs.pmap do |_job_id, job|
      @workers.async.work(job, current_actor) if @workers.alive?
    end
  end

  #  parses the destinations xml document, gets each destination and adds a new job for that
  # destination in the job list and then returns it
  # @see #nokogiri_xml
  #
  # @return [Array<Hash>]
  #
  # @api public
  def fetch_file_jobs
    jobs = [{ atlas_id: 0, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }]
    nokogiri_xml(destinations_file_path).xpath('//destination').pmap do |destination|
      atlas_id = destination.attributes['atlas_id']
      jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
    end
    jobs
  end

  #  fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish
  # @see #fetch_file_jobs
  # @see #delegate_job
  # @see #wait_jobs_termination
  #
  # @return [void]
  #
  # @api public
  def generate_files
    jobs = fetch_file_jobs
    delegate_job(*jobs)
    wait_jobs_termination
  end

  #  retrieves the information about the node from the tree and generates for each destination a new File
  # @see #create_file
  #
  # @return [void]
  #
  # @api public
  def wait_jobs_termination
    sleep(0.1) until all_workers_finished?
    terminate
  end

  #  registers the worker so that the current actor has access to it at any given time and starts the worker
  # @see TaxGenerator::FileCreator#start_work
  #
  # @param  [Hash]  job the job that the worker will work
  # @param  [TaxGenerator::FileCreator]  worker the worker that will create the file
  #
  # @return [void]
  #
  # @api public
  def register_worker_for_job(job, worker)
    @job_to_worker[job['atlas_id']] = worker
    @worker_to_job[worker.mailbox.address] = job
    log_message("worker #{worker.job_id} registed into manager")
    Actor.current.link worker
    worker.async.start_work
  end

  # generates the taxonomy tree , prints it and generates the files
  # @see TaxGenerator::TaxonomyTree#new
  # @see Tree::TreeNode#print_tree
  # @see #generate_files
  #
  # @return [void]
  #
  # @api public
  def work
    prepare_output_dirs
    if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
      @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
      @taxonomy.print_tree
      generate_files
    else
      log_message('Please provide valid options', log_method: 'fatal')
    end
  end

  # logs the message about working being dead if a worker crashes
  # @param  [TaxGenerator::FileCreator]  worker the worker that died
  # @param  [String]  reason the reason for which the worker died
  #
  # @return [void]
  #
  # @api public
  def worker_died(worker, reason)
    mailbox_address = worker.mailbox.address
    job = @worker_to_job.delete(mailbox_address)
    return if reason.blank? || job.blank?
    log_message("worker job #{job['atlas_id']} with mailbox #{mailbox_address.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
  end
end

Instance Method Details

#all_workers_finished?Boolean

checks if all workers finished and returns true or false

Returns:

  • (Boolean)


137
138
139
# File 'lib/tax_generator/classes/processor.rb', line 137

def all_workers_finished?
  @jobs.all? { |_job_id, job| job['status'] == 'finished' }
end

#delegate_job(*jobs) ⇒ void

This method returns an undefined value.

registers all the jobs, and then delegates them to workers

Parameters:

  • jobs (Array)

    the jobs that will be delegated to the workers

See Also:



164
165
166
167
168
169
170
171
# File 'lib/tax_generator/classes/processor.rb', line 164

def delegate_job(*jobs)
  # jobs need to be added into the manager before starting task to avoid adding new key while iterating
  register_jobs(*jobs)
  current_actor = Actor.current
  @jobs.pmap do |_job_id, job|
    @workers.async.work(job, current_actor) if @workers.alive?
  end
end

#destinations_file_nameString

returns the destinations filename from the option list otherwise the default filename

Returns:

  • (String)


80
81
82
# File 'lib/tax_generator/classes/processor.rb', line 80

def destinations_file_name
  @options.fetch(:destinations_filename, 'destinations.xml')
end

#destinations_file_pathString

returns the full path to the destinations file

Returns:

  • (String)


108
109
110
# File 'lib/tax_generator/classes/processor.rb', line 108

def destinations_file_path
  File.join(input_folder, destinations_file_name)
end

#fetch_file_jobsArray<Hash>

parses the destinations xml document, gets each destination and adds a new job for that destination in the job list and then returns it

Returns:

  • (Array<Hash>)

See Also:

  • ApplicationHelper#nokogiri_xml


180
181
182
183
184
185
186
187
# File 'lib/tax_generator/classes/processor.rb', line 180

def fetch_file_jobs
  jobs = [{ atlas_id: 0, taxonomy: @taxonomy, destination: nil, output_folder: output_folder }]
  nokogiri_xml(destinations_file_path).xpath('//destination').pmap do |destination|
    atlas_id = destination.attributes['atlas_id']
    jobs << { atlas_id: atlas_id.value, taxonomy: @taxonomy, destination: destination, output_folder: output_folder }
  end
  jobs
end

#generate_filesvoid

This method returns an undefined value.

fetches the jobs for file generation, then delegates the jobs to workers and waits untill workers finish



197
198
199
200
201
# File 'lib/tax_generator/classes/processor.rb', line 197

def generate_files
  jobs = fetch_file_jobs
  delegate_job(*jobs)
  wait_jobs_termination
end

#input_folderString

returns the input folder from the options list otherwise the default path

Returns:

  • (String)


60
61
62
# File 'lib/tax_generator/classes/processor.rb', line 60

def input_folder
  @options.fetch(:input_dir, "#{root}/data/input")
end

#output_folderString

returns the output folder path from the option list otherwise the default path

Returns:

  • (String)


90
91
92
# File 'lib/tax_generator/classes/processor.rb', line 90

def output_folder
  @options.fetch(:output_dir, "#{root}/data/output")
end

#prepare_output_dirsvoid

This method returns an undefined value.

cleans the output folder and re-creates it and the static folder



126
127
128
129
130
# File 'lib/tax_generator/classes/processor.rb', line 126

def prepare_output_dirs
  FileUtils.rm_rf Dir["#{output_folder}/**/*"]
  create_directories(output_folder, static_output_dir)
  FileUtils.cp_r(Dir["#{File.join(root, 'templates', 'static')}/*"], static_output_dir)
end

#register_jobs(*jobs) ⇒ void

This method returns an undefined value.

registers all the jobs so that the managers can have access to them at any time

Parameters:

  • jobs (Array)

    the jobs that will be registered



148
149
150
151
152
153
# File 'lib/tax_generator/classes/processor.rb', line 148

def register_jobs(*jobs)
  jobs.pmap do |job|
    job = job.stringify_keys
    @jobs[job['atlas_id']] = job
  end
end

#register_worker_for_job(job, worker) ⇒ void

This method returns an undefined value.

registers the worker so that the current actor has access to it at any given time and starts the worker

Parameters:

  • job (Hash)

    the job that the worker will work

  • worker (TaxGenerator::FileCreator)

    the worker that will create the file

See Also:



223
224
225
226
227
228
229
# File 'lib/tax_generator/classes/processor.rb', line 223

def register_worker_for_job(job, worker)
  @job_to_worker[job['atlas_id']] = worker
  @worker_to_job[worker.mailbox.address] = job
  log_message("worker #{worker.job_id} registed into manager")
  Actor.current.link worker
  worker.async.start_work
end

#static_output_dirString

returns the full path to the static folder

Returns:

  • (String)


117
118
119
# File 'lib/tax_generator/classes/processor.rb', line 117

def static_output_dir
  File.join(output_folder, 'static')
end

#taxonomy_file_nameString

returns the taxonomy filename from the option list otherwise the default filename

Returns:

  • (String)


70
71
72
# File 'lib/tax_generator/classes/processor.rb', line 70

def taxonomy_file_name
  @options.fetch(:taxonomy_filename, 'taxonomy.xml')
end

#taxonomy_file_pathString

returns the full path to the taxonomy file

Returns:

  • (String)


99
100
101
# File 'lib/tax_generator/classes/processor.rb', line 99

def taxonomy_file_path
  File.join(input_folder, taxonomy_file_name)
end

#wait_jobs_terminationvoid

This method returns an undefined value.

retrieves the information about the node from the tree and generates for each destination a new File

See Also:

  • #create_file


209
210
211
212
# File 'lib/tax_generator/classes/processor.rb', line 209

def wait_jobs_termination
  sleep(0.1) until all_workers_finished?
  terminate
end

#workvoid

This method returns an undefined value.

generates the taxonomy tree , prints it and generates the files

See Also:



239
240
241
242
243
244
245
246
247
248
# File 'lib/tax_generator/classes/processor.rb', line 239

def work
  prepare_output_dirs
  if File.directory?(input_folder) && File.file?(taxonomy_file_path) && File.file?(destinations_file_path)
    @taxonomy = TaxGenerator::TaxonomyTree.new(taxonomy_file_path)
    @taxonomy.print_tree
    generate_files
  else
    log_message('Please provide valid options', log_method: 'fatal')
  end
end

#worker_died(worker, reason) ⇒ void

This method returns an undefined value.

logs the message about working being dead if a worker crashes

Parameters:



257
258
259
260
261
262
# File 'lib/tax_generator/classes/processor.rb', line 257

def worker_died(worker, reason)
  mailbox_address = worker.mailbox.address
  job = @worker_to_job.delete(mailbox_address)
  return if reason.blank? || job.blank?
  log_message("worker job #{job['atlas_id']} with mailbox #{mailbox_address.inspect} died  for reason:  #{log_error(reason)}", log_method: 'fatal')
end