Class: Patriot::JobStore::RDBJobStore

Inherits:
Base
  • Object
show all
Includes:
Util::DBClient, Util::Logger, Util::Retry
Defined in:
lib/patriot/job_store/rdb_job_store.rb

Overview

a JobStore implementation on RDB

Constant Summary collapse

DEFAULT_PRIORITY =

default priority

1
JOB_TABLE =

Tables job definition table

'jobs'
FLOW_TABLE =

dependency relation table

'flows'
PRODUCER_TABLE =

job and produced product table

'producers'
CONSUMER_TABLE =

job and required product table

'consumers'
HISTORY_TABLE =

table for execution history

'job_histories'
DATE_FORMAT =

date format of execution history

"%Y-%m-%d %H:%M:%S"
TICKET_COLUMNS =

attributes included in job_ticket

['job_id', 'update_id', 'node']
ALL_COLUMNS =

all columns of the job table

[:id,
:job_id,
:job_def_id,
:update_id,
:state,
:content,
:start_after,
:node,
:host,
:priority]
ATTR_TO_COLUMN =

mapping from command attributes to table columns

{Patriot::Command::STATE_ATTR          => :state,
Patriot::Command::PRIORITY_ATTR       => :priority,
Patriot::Command::START_DATETIME_ATTR => :start_after,
Patriot::Command::EXEC_NODE_ATTR      => :node,
Patriot::Command::EXEC_HOST_ATTR      => :host}

Constants included from Util::Config

Util::Config::ADMIN_USER_KEY, Util::Config::DEFAULT_CONFIG, Util::Config::DEFAULT_PLUGIN_DIR, Util::Config::INFO_SERVER_PORT_KEY, Util::Config::PASSWORD_KEY, Util::Config::PLUGIN_DIR_KEY, Util::Config::PLUGIN_INIT_SCRIPT, Util::Config::PLUGIN_KEY, Util::Config::PLUGIN_LIB_DIR, Util::Config::USERNAME_KEY, Util::Config::WORKER_HOST_KEY, Util::Config::WORKER_USER_KEY

Constants included from Util::DBClient

Util::DBClient::DB_CONFIG_KEYS

Instance Method Summary collapse

Methods included from Util::Retry

execute_with_retry

Methods included from Util::Logger

#create_logger

Methods included from Util::Config

#load_config, #load_plugins

Methods included from Util::DBClient

#connect, #read_dbconfig

Methods inherited from Base

#get, #process_subsequent

Constructor Details

#initialize(store_id, config) ⇒ RDBJobStore

Returns a new instance of RDBJobStore.

See Also:



51
52
53
54
55
56
57
# File 'lib/patriot/job_store/rdb_job_store.rb', line 51

def initialize(store_id, config)
  @config = config
  prefix = [Patriot::JobStore::CONFIG_PREFIX, store_id].join(".")
  @db_config = read_dbconfig(prefix, config)
  @logger = create_logger(config)
  @initiator_id = connect(@db_config){|c| c.select(JOB_TABLE, {:job_id => Patriot::JobStore::INITIATOR_JOB_ID})[0].to_hash[:id] }
end

Instance Method Details

#acceptable?(job) ⇒ Boolean

Returns:

  • (Boolean)

See Also:



135
136
137
138
139
140
141
142
143
# File 'lib/patriot/job_store/rdb_job_store.rb', line 135

def acceptable?(job)
  begin
    json = JSON.generate(job.attributes)
  rescue Exception => e
    @logger.warn e
    return false
  end
  return true
end

#delete_job(job_id) ⇒ Object

See Also:



435
436
437
438
439
440
441
442
443
444
445
446
447
# File 'lib/patriot/job_store/rdb_job_store.rb', line 435

def delete_job(job_id)
  connect(@db_config) do |c|
    record = c.select(JOB_TABLE, {:job_id => job_id})
    return if record.nil? || record.empty?
    raise "illegal state: more than one records for #{job_id}" if record.size > 1
    serial_id = record[0].to_hash[:id]
    c.delete(CONSUMER_TABLE, {:job_id => serial_id})
    c.delete(PRODUCER_TABLE, {:job_id => serial_id})
    c.delete(FLOW_TABLE, {:consumer_id => serial_id})
    c.delete(FLOW_TABLE, {:producer_id => serial_id})
    c.delete(JOB_TABLE, {:job_id => job_id})
  end
end

#find_jobs_by_state(state, opts = {}) ⇒ Object



401
402
403
404
405
406
407
408
409
410
411
412
413
414
# File 'lib/patriot/job_store/rdb_job_store.rb', line 401

def find_jobs_by_state(state, opts = {})
  raise "OFFSET is set WITHOUT LIMIT" if opts.has_key?(:offset) && !opts.has_key?(:limit)
  condition = ["state = #{state}", "id != #{@initiator_id}"]
  condition |= ["job_id LIKE '#{opts[:filter_exp]}%'"] if opts.has_key?(:filter_exp)
  query = "SELECT job_id FROM jobs WHERE #{condition.join(' AND ')}"
  query = "#{query} ORDER BY update_id DESC"
  if opts.has_key?(:limit)
    query = "#{query} LIMIT #{opts[:limit]}"
    query = "#{query} OFFSET #{opts[:offset]}" if opts.has_key?(:offset)
  end
  connect(@db_config) do |c|
    return c.execute_statement(query, :select).map{|r| r.job_id }
  end
end

#get_consumers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) ⇒ Object

See Also:



270
271
272
# File 'lib/patriot/job_store/rdb_job_store.rb', line 270

def get_consumers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]})
  connect(@db_config) {|c| return _get_jobs_for_products(CONSUMER_TABLE, products, opts, c)}
end

#get_execution_history(job_id, opts = {}) ⇒ Hash

Returns histories.

Parameters:

  • job_id (String)

    JOB ID

  • opts (Hash) (defaults to: {})

    options

Returns:

  • (Hash)

    histories



296
297
298
# File 'lib/patriot/job_store/rdb_job_store.rb', line 296

def get_execution_history(job_id, opts = {})
  connect(@db_config) {|c| return _get_execution_history(job_id, opts, c)}
end

#get_graph(job_id, opts = {}) ⇒ Array

get nodes and edges information to render graph

Parameters:

  • job_id (String)

    JOB ID

  • opts (Hash) (defaults to: {})

    options

Returns:

  • (Array)
    nodes, edges


316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/patriot/job_store/rdb_job_store.rb', line 316

def get_graph(job_id, opts = {})
  connect(@db_config) do |db_conn|
    job = _get_job(job_id, db_conn)

    job[:consumers] = _get_jobs_for_products(CONSUMER_TABLE, job[Patriot::Command::PRODUCTS_ATTR], opts, db_conn) || []
    job[:producers] = _get_jobs_for_products(PRODUCER_TABLE, job[Patriot::Command::REQUISITES_ATTR], opts, db_conn) || []
    history = _get_execution_history(job_id, {}, db_conn)[0]

    hashed_job = {
      :job_id => job.job_id,
      :history => history,
      :depth => 0
    }.merge(job.attributes)

    # set self node
    nodes = {job_id => hashed_job}
    edges = []

    _set_dependency(
      db_conn,
      :producers,
      opts[:producer_depth],
      nodes,
      edges,
      hashed_job
    )

    _set_dependency(
      db_conn,
      :consumers,
      opts[:consumer_depth],
      nodes,
      edges,
      hashed_job
    )

    return {:nodes => nodes, :edges => edges}
  end
end

#get_job(job_id) ⇒ Patriot::JobStore::Job

Parameters:

  • job_id (String)

    JOB ID

Returns:

See Also:



244
245
246
# File 'lib/patriot/job_store/rdb_job_store.rb', line 244

def get_job(job_id)
  connect(@db_config) {|c| return _get_job(job_id, c)}
end

#get_job_size(opts = {}) ⇒ Object

See Also:



417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
# File 'lib/patriot/job_store/rdb_job_store.rb', line 417

def get_job_size(opts = {})
  opts  = {:ignore_states => []}.merge(opts)
  if opts[:ignore_states].empty?
    query = "SELECT state, count(1) size FROM jobs GROUP BY state"
  else
    query = "SELECT state, count(1) size FROM jobs WHERE #{opts[:ignore_states].map{|s| "state != #{s}" }.join(" AND ")} GROUP BY state"
  end
  sizes = {}
  connect(@db_config) do |c|
    c.execute_statement(query).each do |r|
      sizes[r.state] = r.size
      sizes[r.state] = sizes[r.state] - 1 if r.state == Patriot::JobStore::JobState::SUCCEEDED # ignore initiator
    end
  end
  return sizes
end

#get_job_tickets(host, nodes, options = {}) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
# File 'lib/patriot/job_store/rdb_job_store.rb', line 146

def get_job_tickets(host, nodes, options = {})
  nodes = [nodes] unless nodes.is_a?(Array)
  begin
    query = _generate_fetching_job_sql(host, nodes,options)
    @logger.debug "fetchings job by #{query}"
    connect(@db_config) do |c|
      return c.execute_statement(query).map{|r| Patriot::JobStore::JobTicket.new(r.job_id, r.update_id, r.node) }
    end
  rescue => e
    @logger.error e
    raise e
  end
end

#get_producers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) ⇒ Object

See Also:



265
266
267
# File 'lib/patriot/job_store/rdb_job_store.rb', line 265

def get_producers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]})
  connect(@db_config) {|c| return _get_jobs_for_products(PRODUCER_TABLE, products, opts, c)}
end

#offer_to_execute(job_ticket) ⇒ Object



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/patriot/job_store/rdb_job_store.rb', line 181

def offer_to_execute(job_ticket)
  connect(@db_config) do |c|
    unless _check_and_set_state(job_ticket, Patriot::JobStore::JobState::WAIT, Patriot::JobStore::JobState::RUNNING, c)
      @logger.debug("execution of job: #{job_ticket.job_id} is skipped")
      return nil
    end
    execution_id = c.insert(HISTORY_TABLE,
                           {:job_id     => job_ticket.job_id,
                            :node       => job_ticket.exec_node,
                            :host       => job_ticket.exec_host,
                            :thread     => job_ticket.exec_thread,
                            :begin_at   => Time.now.strftime(DATE_FORMAT)})
    record = c.select(JOB_TABLE, {:job_id => job_ticket.job_id})
    raise "duplicated entry found for #{job_ticket}" if record.size > 1
    raise "no entry found for #{job_ticket}" if record.empty?
    job = _record_to_job(record[0])
    begin
      return {:execution_id => execution_id, :command => job.to_command(@config)}
    rescue Exception => e
      marked = _check_and_set_state(job_ticket, Patriot::JobStore::JobState::RUNNING, Patriot::JobStore::JobState::FAILED, c)
      @logger.error "failed to create a command for #{job_ticket.job_id} (set to error? #{marked})"
      raise e
    end
  end
end

#register(update_id, jobs) ⇒ Object

See Also:



60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/patriot/job_store/rdb_job_store.rb', line 60

def register(update_id, jobs)
  jobs = [jobs] unless jobs.is_a? Array
  jobs.each{|job| raise "#{job.job_id} is not acceptable" unless acceptable?(job) }
  @logger.info "start to register jobs"
  connect(@db_config) do |c|
    jobs.each{|job| _upsert_job(update_id, job, c)}
    c.update(JOB_TABLE,
             {:state => Patriot::JobStore::JobState::WAIT},
             {:state => Patriot::JobStore::JobState::INIT, :update_id => update_id}
            )
  end
  @logger.info "job registration finished"
end

#report_completion_status(job_ticket) ⇒ Object



208
209
210
211
212
213
214
215
216
217
218
# File 'lib/patriot/job_store/rdb_job_store.rb', line 208

def report_completion_status(job_ticket)
  exit_code  = job_ticket.exit_code
  post_state = Patriot::JobStore::EXIT_CODE_TO_STATE[exit_code]
  raise "illegal exit_code #{exit_code}" if post_state.nil?
  connect(@db_config) do |c|
    if c.update(HISTORY_TABLE, {:end_at => Time.now.strftime(DATE_FORMAT), :exit_code => exit_code, :description => job_ticket.description}, {:id => job_ticket.execution_id}) != 1
      @logger.warn "illegal state of history for #{job_ticket.job_id}"
    end
    return _check_and_set_state(job_ticket, Patriot::JobStore::JobState::RUNNING, post_state, c)
  end
end

#set_state(update_id, job_ids, new_state) ⇒ Object

See Also:



235
236
237
238
239
# File 'lib/patriot/job_store/rdb_job_store.rb', line 235

def set_state(update_id, job_ids, new_state)
  raise "jobs are not selected" if job_ids.nil? || job_ids.empty?
  stmt = "UPDATE jobs SET state = #{new_state} WHERE #{job_ids.map{|jid| "job_id = '#{jid}'"}.join(" OR ")}"
  connect(@db_config){|c| c.execute_statement(stmt, :update)}
end