Class: Patriot::JobStore::Base

Inherits:
Object
  • Object
show all
Includes:
Util::Logger
Defined in:
lib/patriot/job_store/base.rb

Overview

base class of JobStore

Direct Known Subclasses

InMemoryStore, RDBJobStore

Constant Summary

Constants included from Util::Config

Util::Config::ADMIN_USER_KEY, Util::Config::DEFAULT_CONFIG, Util::Config::DEFAULT_PLUGIN_DIR, Util::Config::INFO_SERVER_PORT_KEY, Util::Config::PASSWORD_KEY, Util::Config::PLUGIN_DIR_KEY, Util::Config::PLUGIN_INIT_SCRIPT, Util::Config::PLUGIN_KEY, Util::Config::PLUGIN_LIB_DIR, Util::Config::USERNAME_KEY, Util::Config::WORKER_HOST_KEY, Util::Config::WORKER_USER_KEY

Instance Method Summary collapse

Methods included from Util::Logger

#create_logger

Methods included from Util::Config

#load_config, #load_plugins

Constructor Details

#initialize(store_id, config) ⇒ Base

Returns a new instance of Base.

Parameters:

Raises:

  • (NotImplementedError)


11
12
13
# File 'lib/patriot/job_store/base.rb', line 11

def initialize(store_id, config)
  raise NotImplementedError
end

Instance Method Details

#acceptable?(command) ⇒ Boolean

check whether the command can be stored as a job to this job_store

Parameters:

Returns:

  • (Boolean)

    true if the command can be converted to a job

Raises:

  • (NotImplementedError)


25
26
27
# File 'lib/patriot/job_store/base.rb', line 25

def acceptable?(command)
  raise NotImplementedError
end

#delete_job(job_id) ⇒ Object

delete the job from this job_store

Parameters:

  • job_id (String)

Raises:

  • (NotImplementedError)


143
144
145
# File 'lib/patriot/job_store/base.rb', line 143

def delete_job(job_id)
  raise NotImplementedError
end

#find_jobs_by_state(state, opts = {}) ⇒ Array

Returns an array of job_id which is in the given state.

Parameters:

Returns:

  • (Array)

    an array of job_id which is in the given state

Raises:

  • (NotImplementedError)


130
131
132
# File 'lib/patriot/job_store/base.rb', line 130

def find_jobs_by_state(state, opts = {})
  raise NotImplementedError
end

#get(job_id, opts = {}) ⇒ Patrio::JobStore::Job

get a job jobs in dependency set to :consumers/:producers as a hash from job_id to state

Parameters:

  • job_id (String)

    job_id

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :include_dependency (String)

    include jobs with 1-hop dependency

Returns:

  • (Patrio::JobStore::Job)

    in case of include_dependency is true,



71
72
73
74
75
76
77
78
79
80
# File 'lib/patriot/job_store/base.rb', line 71

def get(job_id, opts={})
  return nil if job_id.nil?
  job = get_job(job_id)
  return if job.nil?
  if opts[:include_dependency] == true
    job[:consumers] = get_consumers(job[Patriot::Command::PRODUCTS_ATTR]) || []
    job[:producers] = get_producers(job[Patriot::Command::REQUISITES_ATTR]) || []
  end
  return job
end

#get_consumers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) ⇒ Hash

get consumers of products

Parameters:

  • products (Array)

    a list of product name

  • opts (Hash) (defaults to: {:include_attrs => [Patriot::Command::STATE_ATTR]})
  • opt (Hash)

    a customizable set of options

Returns:

  • (Hash)

    a hash from job_id to its state

Raises:

  • (NotImplementedError)


103
104
105
# File 'lib/patriot/job_store/base.rb', line 103

def get_consumers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]})
  raise NotImplementedError
end

#get_execution_history(job_id, opts = {}) ⇒ Object

get execution histories of the specified job

Parameters:

  • job_id (String)
  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :limit (Integer)

    a max number of history records (default 1)

  • :order (Symbol)

    order of record [:DESC or :ASC, default is :DESC]

Raises:

  • (NotImplementedError)


112
113
114
# File 'lib/patriot/job_store/base.rb', line 112

def get_execution_history(job_id, opts = {})
  raise NotImplementedError
end

#get_graph(job_id, opts = {}) ⇒ Array

get nodes and edges information to render graph

Parameters:

  • job_id (String)

    JOB ID

  • opts (Hash) (defaults to: {})

    options

Returns:

  • (Array)
    nodes, edges

Raises:

  • (NotImplementedError)


120
121
122
# File 'lib/patriot/job_store/base.rb', line 120

def get_graph(job_id, opts = {})
  raise NotImplementedError
end

#get_job(job_id) ⇒ Patriot::JobStore::Job

get a job data

Parameters:

  • job_id (String)

Returns:

Raises:

  • (NotImplementedError)


85
86
87
# File 'lib/patriot/job_store/base.rb', line 85

def get_job(job_id)
  raise NotImplementedError
end

#get_job_size(opts = {}) ⇒ Hash<Patriot::JobStore::JobState, Integer>

Returns a hash from job state to the number of jobs in the state.

Parameters:

  • opts (Hash) (defaults to: {})
  • [Array<Patriot::JobStore::JobState>] (Hash)

    a customizable set of options

Returns:

Raises:

  • (NotImplementedError)


137
138
139
# File 'lib/patriot/job_store/base.rb', line 137

def get_job_size(opts = {})
  raise NotImplementedError
end

#get_job_tickets(host, nodes, opts = {}) ⇒ Object

get job tickets for jobs which are ready to execute

Parameters:

  • host (String)

    the host name of the client

  • nodes (Array)

    array of nodes on the client

  • opts (Hash) (defaults to: {})

Options Hash (opts):

  • :fetch_limit (Integer)

    the max number of tickets

Raises:

  • (NotImplementedError)


34
35
36
# File 'lib/patriot/job_store/base.rb', line 34

def get_job_tickets(host, nodes, opts = {})
  raise NotImplementedError
end

#get_producers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]}) ⇒ Hash

get producers of products

Parameters:

  • products (Array)

    a list of product name

  • opts (Hash) (defaults to: {:include_attrs => [Patriot::Command::STATE_ATTR]})
  • opt (Hash)

    a customizable set of options

Returns:

  • (Hash)

    a hash from job_id to its state

Raises:

  • (NotImplementedError)


94
95
96
# File 'lib/patriot/job_store/base.rb', line 94

def get_producers(products, opts = {:include_attrs => [Patriot::Command::STATE_ATTR]})
  raise NotImplementedError
end

#offer_to_execute(job_ticket) ⇒ Hash

offer to execute a job specified with a job_ticket If the job is ready to execute, the state of job is set to JobState::RUNNING. A response of this method is a Hash including

  • :execution_id the identifier of the execution (used to identify history record)

  • :command an instance of command for the offered job

Parameters:

Returns:

  • (Hash)

    response for the offer

Raises:

  • (NotImplementedError)


45
46
47
# File 'lib/patriot/job_store/base.rb', line 45

def offer_to_execute(job_ticket)
  raise NotImplementedError
end

#process_subsequent(job_ids) {|job_store, jobs| ... } ⇒ Object

Process subsequent jobs with a given block. The block is called for each dependency depth.

Parameters:

  • job_ids (Array<String>)

Yield Parameters:



152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/patriot/job_store/base.rb', line 152

def process_subsequent(job_ids, &blk)
  products = job_ids.map{|jid|
    job = get_job(jid)
    job.nil? ? nil : job[Patriot::Command::PRODUCTS_ATTR]
  }.compact.flatten
  consumers = get_consumers(products)
  while !consumers.empty?
    jobs = consumers.map{|job| get_job(job[:job_id])}.compact
    yield self, jobs
    products = jobs.map{|j| j[Patriot::Command::PRODUCTS_ATTR]}.compact.flatten
    consumers = get_consumers(products)
  end
end

#register(update_id, jobs) ⇒ Object

register the given jobs with the given update_id

Parameters:

  • update_id (Integer)
  • jobs (Array)

    a list of jobs to be registered

Raises:

  • (NotImplementedError)


18
19
20
# File 'lib/patriot/job_store/base.rb', line 18

def register(update_id, jobs)
  raise NotImplementedError
end

#report_completion_status(job_ticket) ⇒ Boolean

report completion status a job specified with a job_ticket The state of the job should be changed according to the completion status.

Parameters:

Returns:

  • (Boolean)

    true if the job exists and the state is updated, otherwise false

Raises:

  • (NotImplementedError)


53
54
55
# File 'lib/patriot/job_store/base.rb', line 53

def report_completion_status(job_ticket)
  raise NotImplementedError
end

#set_state(update_id, job_ids, new_state) ⇒ Object

set jobs state

Parameters:

  • update_id (Integer)
  • job_ids (Array)

    list of job_ids

  • new_state (Integer)

    new state of the job.

Raises:

  • (NotImplementedError)


61
62
63
# File 'lib/patriot/job_store/base.rb', line 61

def set_state(update_id, job_ids, new_state)
  raise NotImplementedError
end