Class: Patriot::JobStore::RDBJobStore
- Includes:
- Util::DBClient, Util::Logger, Util::Retry
- Defined in:
- lib/patriot/job_store/rdb_job_store.rb
Overview
a JobStore implementation on RDB
Constant Summary collapse
- DEFAULT_PRIORITY =
default priority
1
- JOB_TABLE =
Tables job definition table
'jobs'
- FLOW_TABLE =
dependency relation table
'flows'
- PRODUCER_TABLE =
job and produced product table
'producers'
- CONSUMER_TABLE =
job and required product table
'consumers'
- HISTORY_TABLE =
table for execution history
'job_histories'
- DATE_FORMAT =
date format of execution history
"%Y-%m-%d %H:%M:%S"
- TICKET_COLUMNS =
attributes included in job_ticket
['job_id', 'update_id', 'node']
- ALL_COLUMNS =
all columns of the job table
[:id, :job_id, :job_def_id, :update_id, :state, :content, :start_after, :node, :host, :priority]
- ATTR_TO_COLUMN =
mapping from command attributes to table columns
{Patriot::Command::STATE_ATTR => :state, Patriot::Command::PRIORITY_ATTR => :priority, Patriot::Command::START_DATETIME_ATTR => :start_after, Patriot::Command::EXEC_NODE_ATTR => :node, Patriot::Command::EXEC_HOST_ATTR => :host}
Constants included from Util::Config
Util::Config::ADMIN_USER_KEY, Util::Config::DEFAULT_CONFIG, Util::Config::DEFAULT_PLUGIN_DIR, Util::Config::INFO_SERVER_PORT_KEY, Util::Config::PASSWORD_KEY, Util::Config::PLUGIN_DIR_KEY, Util::Config::PLUGIN_INIT_SCRIPT, Util::Config::PLUGIN_KEY, Util::Config::PLUGIN_LIB_DIR, Util::Config::USERNAME_KEY, Util::Config::WORKER_HOST_KEY, Util::Config::WORKER_USER_KEY
Constants included from Util::DBClient
Util::DBClient::DB_CONFIG_KEYS
Instance Method Summary collapse
- #acceptable?(job) ⇒ Boolean
- #delete_job(job_id) ⇒ Object
- #find_jobs_by_state(state, opts = {}) ⇒ Object
- #get_consumers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) ⇒ Object
-
#get_execution_history(job_id, opts = {}) ⇒ Hash
Histories.
-
#get_graph(job_id, opts = {}) ⇒ Array
get nodes and edges information to render graph.
- #get_job(job_id) ⇒ Patriot::JobStore::Job
- #get_job_size(opts = {}) ⇒ Object
- #get_job_tickets(host, nodes, options = {}) ⇒ Object
- #get_producers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) ⇒ Object
-
#initialize(store_id, config) ⇒ RDBJobStore
constructor
A new instance of RDBJobStore.
- #offer_to_execute(job_ticket) ⇒ Object
- #register(update_id, jobs) ⇒ Object
- #report_completion_status(job_ticket) ⇒ Object
- #set_state(update_id, job_ids, new_state) ⇒ Object
Methods included from Util::Retry
Methods included from Util::Logger
Methods included from Util::Config
Methods included from Util::DBClient
Methods inherited from Base
Constructor Details
#initialize(store_id, config) ⇒ RDBJobStore
Returns a new instance of RDBJobStore.
51 52 53 54 55 56 57 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 51 def initialize(store_id, config) @config = config prefix = [Patriot::JobStore::CONFIG_PREFIX, store_id].join(".") @db_config = read_dbconfig(prefix, config) @logger = create_logger(config) @initiator_id = connect(@db_config){|c| c.select(JOB_TABLE, {:job_id => Patriot::JobStore::INITIATOR_JOB_ID})[0].to_hash[:id] } end |
Instance Method Details
#acceptable?(job) ⇒ Boolean
135 136 137 138 139 140 141 142 143 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 135 def acceptable?(job) begin json = JSON.generate(job.attributes) rescue Exception => e @logger.warn e return false end return true end |
#delete_job(job_id) ⇒ Object
435 436 437 438 439 440 441 442 443 444 445 446 447 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 435 def delete_job(job_id) connect(@db_config) do |c| record = c.select(JOB_TABLE, {:job_id => job_id}) return if record.nil? || record.empty? raise "illegal state: more than one records for #{job_id}" if record.size > 1 serial_id = record[0].to_hash[:id] c.delete(CONSUMER_TABLE, {:job_id => serial_id}) c.delete(PRODUCER_TABLE, {:job_id => serial_id}) c.delete(FLOW_TABLE, {:consumer_id => serial_id}) c.delete(FLOW_TABLE, {:producer_id => serial_id}) c.delete(JOB_TABLE, {:job_id => job_id}) end end |
#find_jobs_by_state(state, opts = {}) ⇒ Object
401 402 403 404 405 406 407 408 409 410 411 412 413 414 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 401 def find_jobs_by_state(state, opts = {}) raise "OFFSET is set WITHOUT LIMIT" if opts.has_key?(:offset) && !opts.has_key?(:limit) condition = ["state = #{state}", "id != #{@initiator_id}"] condition |= ["job_id LIKE '#{opts[:filter_exp]}%'"] if opts.has_key?(:filter_exp) query = "SELECT job_id FROM jobs WHERE #{condition.join(' AND ')}" query = "#{query} ORDER BY update_id DESC" if opts.has_key?(:limit) query = "#{query} LIMIT #{opts[:limit]}" query = "#{query} OFFSET #{opts[:offset]}" if opts.has_key?(:offset) end connect(@db_config) do |c| return c.execute_statement(query, :select).map{|r| r.job_id } end end |
#get_consumers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) ⇒ Object
270 271 272 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 270 def get_consumers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) connect(@db_config) {|c| return _get_jobs_for_products(CONSUMER_TABLE, products, opts, c)} end |
#get_execution_history(job_id, opts = {}) ⇒ Hash
Returns histories.
296 297 298 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 296 def get_execution_history(job_id, opts = {}) connect(@db_config) {|c| return _get_execution_history(job_id, opts, c)} end |
#get_graph(job_id, opts = {}) ⇒ Array
get nodes and edges information to render graph
316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 316 def get_graph(job_id, opts = {}) connect(@db_config) do |db_conn| job = _get_job(job_id, db_conn) job[:consumers] = _get_jobs_for_products(CONSUMER_TABLE, job[Patriot::Command::PRODUCTS_ATTR], opts, db_conn) || [] job[:producers] = _get_jobs_for_products(PRODUCER_TABLE, job[Patriot::Command::REQUISITES_ATTR], opts, db_conn) || [] history = _get_execution_history(job_id, {}, db_conn)[0] hashed_job = { :job_id => job.job_id, :history => history, :depth => 0 }.merge(job.attributes) # set self node nodes = {job_id => hashed_job} edges = [] _set_dependency( db_conn, :producers, opts[:producer_depth], nodes, edges, hashed_job ) _set_dependency( db_conn, :consumers, opts[:consumer_depth], nodes, edges, hashed_job ) return {:nodes => nodes, :edges => edges} end end |
#get_job(job_id) ⇒ Patriot::JobStore::Job
244 245 246 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 244 def get_job(job_id) connect(@db_config) {|c| return _get_job(job_id, c)} end |
#get_job_size(opts = {}) ⇒ Object
417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 417 def get_job_size(opts = {}) opts = {:ignore_states => []}.merge(opts) if opts[:ignore_states].empty? query = "SELECT state, count(1) size FROM jobs GROUP BY state" else query = "SELECT state, count(1) size FROM jobs WHERE #{opts[:ignore_states].map{|s| "state != #{s}" }.join(" AND ")} GROUP BY state" end sizes = {} connect(@db_config) do |c| c.execute_statement(query).each do |r| sizes[r.state] = r.size sizes[r.state] = sizes[r.state] - 1 if r.state == Patriot::JobStore::JobState::SUCCEEDED # ignore initiator end end return sizes end |
#get_job_tickets(host, nodes, options = {}) ⇒ Object
146 147 148 149 150 151 152 153 154 155 156 157 158 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 146 def get_job_tickets(host, nodes, = {}) nodes = [nodes] unless nodes.is_a?(Array) begin query = _generate_fetching_job_sql(host, nodes,) @logger.debug "fetchings job by #{query}" connect(@db_config) do |c| return c.execute_statement(query).map{|r| Patriot::JobStore::JobTicket.new(r.job_id, r.update_id, r.node) } end rescue => e @logger.error e raise e end end |
#get_producers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) ⇒ Object
265 266 267 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 265 def get_producers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) connect(@db_config) {|c| return _get_jobs_for_products(PRODUCER_TABLE, products, opts, c)} end |
#offer_to_execute(job_ticket) ⇒ Object
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 181 def offer_to_execute(job_ticket) connect(@db_config) do |c| unless _check_and_set_state(job_ticket, Patriot::JobStore::JobState::WAIT, Patriot::JobStore::JobState::RUNNING, c) @logger.debug("execution of job: #{job_ticket.job_id} is skipped") return nil end execution_id = c.insert(HISTORY_TABLE, {:job_id => job_ticket.job_id, :node => job_ticket.exec_node, :host => job_ticket.exec_host, :thread => job_ticket.exec_thread, :begin_at => Time.now.strftime(DATE_FORMAT)}) record = c.select(JOB_TABLE, {:job_id => job_ticket.job_id}) raise "duplicated entry found for #{job_ticket}" if record.size > 1 raise "no entry found for #{job_ticket}" if record.empty? job = _record_to_job(record[0]) begin return {:execution_id => execution_id, :command => job.to_command(@config)} rescue Exception => e marked = _check_and_set_state(job_ticket, Patriot::JobStore::JobState::RUNNING, Patriot::JobStore::JobState::FAILED, c) @logger.error "failed to create a command for #{job_ticket.job_id} (set to error? #{marked})" raise e end end end |
#register(update_id, jobs) ⇒ Object
60 61 62 63 64 65 66 67 68 69 70 71 72 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 60 def register(update_id, jobs) jobs = [jobs] unless jobs.is_a? Array jobs.each{|job| raise "#{job.job_id} is not acceptable" unless acceptable?(job) } @logger.info "start to register jobs" connect(@db_config) do |c| jobs.each{|job| _upsert_job(update_id, job, c)} c.update(JOB_TABLE, {:state => Patriot::JobStore::JobState::WAIT}, {:state => Patriot::JobStore::JobState::INIT, :update_id => update_id} ) end @logger.info "job registration finished" end |
#report_completion_status(job_ticket) ⇒ Object
208 209 210 211 212 213 214 215 216 217 218 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 208 def report_completion_status(job_ticket) exit_code = job_ticket.exit_code post_state = Patriot::JobStore::EXIT_CODE_TO_STATE[exit_code] raise "illegal exit_code #{exit_code}" if post_state.nil? connect(@db_config) do |c| if c.update(HISTORY_TABLE, {:end_at => Time.now.strftime(DATE_FORMAT), :exit_code => exit_code, :description => job_ticket.description}, {:id => job_ticket.execution_id}) != 1 @logger.warn "illegal state of history for #{job_ticket.job_id}" end return _check_and_set_state(job_ticket, Patriot::JobStore::JobState::RUNNING, post_state, c) end end |
#set_state(update_id, job_ids, new_state) ⇒ Object
235 236 237 238 239 |
# File 'lib/patriot/job_store/rdb_job_store.rb', line 235 def set_state(update_id, job_ids, new_state) raise "jobs are not selected" if job_ids.nil? || job_ids.empty? stmt = "UPDATE jobs SET state = #{new_state} WHERE #{job_ids.map{|jid| "job_id = '#{jid}'"}.join(" OR ")}" connect(@db_config){|c| c.execute_statement(stmt, :update)} end |