Class: Logical::Naf::JobCreator
- Inherits:
-
Object
- Object
- Logical::Naf::JobCreator
- Defined in:
- app/models/logical/naf/job_creator.rb
Class Method Summary collapse
Instance Method Summary collapse
- #create_queue_job(historical_job) ⇒ Object
- #queue_application(application, application_run_group_restriction, application_run_group_name, application_run_group_limit = 1, priority = 0, affinities = [], prerequisites = [], enqueue = false) ⇒ Object
- #queue_application_schedule(application_schedule, schedules_queued_already = []) ⇒ Object
-
#queue_rails_job(command, application_run_group_restriction = ::Naf::ApplicationRunGroupRestriction.limited_per_all_machines, application_run_group_name = :command, application_run_group_limit = 1, priority = 0, affinities = [], prerequisites = []) ⇒ Object
This method act similar to queue_application but is used for testing purpose.
- #queue_test ⇒ Object
Class Method Details
.test(*foo) ⇒ Object
145 146 147 148 149 150 |
# File 'app/models/logical/naf/job_creator.rb', line 145 def self.test(*foo) seconds = rand 120 + 15 puts "TEST CALLED: #{Time.zone.now}: #{foo.inspect}: sleeping for #{seconds} seconds" sleep(seconds) puts "TEST DONE: #{Time.zone.now}: #{foo.inspect}" end |
Instance Method Details
#create_queue_job(historical_job) ⇒ Object
129 130 131 132 133 134 135 136 137 138 139 |
# File 'app/models/logical/naf/job_creator.rb', line 129 def create_queue_job(historical_job) queued_job = ::Naf::QueuedJob.new(application_id: historical_job.application_id, application_type_id: historical_job.application_type_id, command: historical_job.command, application_run_group_restriction_id: historical_job.application_run_group_restriction_id, application_run_group_name: historical_job.application_run_group_name, application_run_group_limit: historical_job.application_run_group_limit, priority: historical_job.priority) queued_job.id = historical_job.id queued_job.save! end |
#queue_application(application, application_run_group_restriction, application_run_group_name, application_run_group_limit = 1, priority = 0, affinities = [], prerequisites = [], enqueue = false) ⇒ Object
4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'app/models/logical/naf/job_creator.rb', line 4 def queue_application(application, application_run_group_restriction, application_run_group_name, application_run_group_limit = 1, priority = 0, affinities = [], prerequisites = [], enqueue = false) # Before adding a job to the queue, check whether the number of # jobs (running/queued) is equal to or greater than the application # run group limit, or if enqueue_backlogs is set to false. If so, # do not add the job to the queue running_jobs = ::Naf::RunningJob. select('application_run_group_limit, MAX(created_at) AS created_at, count(*)'). where('command = ? AND application_run_group_name = ?', application.command, application_run_group_name). group('application_run_group_name, application_run_group_limit').first queued_jobs = ::Naf::QueuedJob. select('application_run_group_limit, MAX(created_at) AS created_at, count(*)'). where('command = ? AND application_run_group_name = ?', application.command, application_run_group_name). group('application_run_group_name, application_run_group_limit').first if enqueue == false && (running_jobs.present? || queued_jobs.present?) group_limit = running_jobs.try(:application_run_group_limit).to_i + queued_jobs.try(:application_run_group_limit).to_i total_jobs = running_jobs.try(:count).to_i + queued_jobs.try(:count).to_i return if group_limit <= total_jobs end ::Naf::HistoricalJob.transaction do historical_job = ::Naf::HistoricalJob.create!(application_id: application.id, application_type_id: application.application_type_id, command: application.command, application_run_group_restriction_id: application_run_group_restriction.id, application_run_group_name: application_run_group_name, application_run_group_limit: application_run_group_limit, priority: priority, log_level: application.log_level) historical_job.([::Naf::HistoricalJob::SYSTEM_TAGS[:pre_work]]) # Create historical job affinity tabs for each affinity associated with the historical job affinities.each do |affinity| affinity_parameter = ::Naf::ApplicationScheduleAffinityTab. where(affinity_id: affinity.id, application_schedule_id: application.application_schedule.try(:id)). first.try(:affinity_parameter) ::Naf::HistoricalJobAffinityTab.create(historical_job_id: historical_job.id, affinity_id: affinity.id, affinity_parameter: affinity_parameter) end historical_job.verify_prerequisites(prerequisites) # Create historical job prerequisites for each prerequisite associated with the historical job prerequisites.each do |prerequisite| ::Naf::HistoricalJobPrerequisite.create(historical_job_id: historical_job.id, prerequisite_historical_job_id: prerequisite.id) end create_queue_job(historical_job) return historical_job end end |
#queue_application_schedule(application_schedule, schedules_queued_already = []) ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'app/models/logical/naf/job_creator.rb', line 71 def queue_application_schedule(application_schedule, schedules_queued_already = []) prerequisite_jobs = [] # Check if schedule has been queued if schedules_queued_already.include? application_schedule.id raise ::Naf::HistoricalJob::JobPrerequisiteLoop.new(application_schedule) end # Keep track of queued schedules schedules_queued_already << application_schedule.id # Queue application schedule prerequisites application_schedule.prerequisites.each do |application_schedule_prerequisite| prerequisite_jobs << queue_application_schedule(application_schedule_prerequisite, schedules_queued_already) end # Queue the application return queue_application(application_schedule.application, application_schedule.application_run_group_restriction, application_schedule.application_run_group_name, application_schedule.application_run_group_limit, application_schedule.priority, application_schedule.affinities, prerequisite_jobs, application_schedule.enqueue_backlogs) end |
#queue_rails_job(command, application_run_group_restriction = ::Naf::ApplicationRunGroupRestriction.limited_per_all_machines, application_run_group_name = :command, application_run_group_limit = 1, priority = 0, affinities = [], prerequisites = []) ⇒ Object
This method act similar to queue_application but is used for testing purpose
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# File 'app/models/logical/naf/job_creator.rb', line 98 def queue_rails_job(command, application_run_group_restriction = ::Naf::ApplicationRunGroupRestriction.limited_per_all_machines, application_run_group_name = :command, application_run_group_limit = 1, priority = 0, affinities = [], prerequisites = []) application_run_group_name = command if application_run_group_name == :command ::Naf::HistoricalJob.transaction do historical_job = ::Naf::HistoricalJob.create!(application_type_id: 1, command: command, application_run_group_restriction_id: application_run_group_restriction.id, application_run_group_name: application_run_group_name, application_run_group_limit: application_run_group_limit, priority: priority) affinities.each do |affinity| ::Naf::HistoricalJobAffinityTab.create(historical_job_id: historical_job.id, affinity_id: affinity.id) end historical_job.verify_prerequisites(prerequisites) prerequisites.each do |prerequisite| ::Naf::HistoricalJobPrerequisite.create(historical_job_id: historical_job.id, prerequisite_historical_job_id: prerequisite.id) end create_queue_job(historical_job) return historical_job end end |
#queue_test ⇒ Object
141 142 143 |
# File 'app/models/logical/naf/job_creator.rb', line 141 def queue_test queue_rails_job("#{self.class.name}.test") end |