Class: Pho::Jobs

Inherits:
Object
  • Object
show all
Defined in:
lib/pho/job.rb

Overview

TODO job deletion

Constant Summary collapse

RESET =
"http://schemas.talis.com/2006/bigfoot/configuration#ResetDataJob".freeze
SNAPSHOT =
"http://schemas.talis.com/2006/bigfoot/configuration#SnapshotJob".freeze
REINDEX =
"http://schemas.talis.com/2006/bigfoot/configuration#ReindexJob".freeze
RESTORE =
"http://schemas.talis.com/2006/bigfoot/configuration#RestoreJob".freeze

Class Method Summary collapse

Class Method Details

.build_job_request(type, label, t = Time.now, snapshot_uri = nil) ⇒ Object

Construct an RDF/XML document containing a job request for submitting to the Platform.

t

a Time object, specifying the time at which the request should be carried out



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/pho/job.rb', line 79

def Jobs.build_job_request(type, label, t=Time.now, snapshot_uri=nil)
  
  time = t.getutc.strftime("%Y-%m-%dT%H:%M:%SZ")
  data = "<rdf:RDF xmlns:rdf=\"http://www.w3.org/1999/02/22-rdf-syntax-ns#\" "
  data << " xmlns:rdfs=\"http://www.w3.org/2000/01/rdf-schema#\" " 
  data << " xmlns:bf=\"http://schemas.talis.com/2006/bigfoot/configuration#\"> " 
  data << " <bf:JobRequest>"
  data << "   <rdfs:label>#{label}</rdfs:label>"    
  data << "   <bf:jobType rdf:resource=\"#{type}\"/>"
  data << "   <bf:startTime>#{time}</bf:startTime>"
  
  if (snapshot_uri != nil)
    data << "   <bf:snapshotUri rdf:resource=\"#{snapshot_uri}\"/>"        
  end
  
  data << " </bf:JobRequest>"
  data << "</rdf:RDF>"
  return data      
end

.read_from_store(store) ⇒ Object

Reads the current list of scheduled jobs from the provided store. Returns an array of job names

store

store from which to read the scheduled job list



16
17
18
19
20
21
22
23
24
25
26
27
28
29
# File 'lib/pho/job.rb', line 16

def Jobs.read_from_store(store)
  resp = store.get_jobs()
  if resp.status != 200
    raise "Unable to read jobs from store. Status was {resp.status}"
  end
  jobs = Array.new
  
  doc = REXML::Document.new(resp.content)
  REXML::XPath.each(doc.root, "//bf:job", Pho::Namespaces::MAPPING) do |el|
    jobs << el.attributes["rdf:resource"]
  end
  return jobs
  
end

.submit_job(store, jobtype, label, t = Time.now, snapshot_uri = nil) ⇒ Object

Generic submit job method



72
73
74
# File 'lib/pho/job.rb', line 72

def Jobs.submit_job(store, jobtype, label, t=Time.now, snapshot_uri=nil)
  store.submit_job( build_job_request(jobtype, label, t, snapshot_uri) )
end

.submit_reindex(store, label = "Reindex my store", t = Time.now) ⇒ Object

Submit a reindex job to the Platform

This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.



47
48
49
# File 'lib/pho/job.rb', line 47

def Jobs.submit_reindex(store, label="Reindex my store", t=Time.now)
  return submit_job(store, Pho::Jobs::REINDEX, label, t)      
end

.submit_reset(store, label = "Reset my store", t = Time.now) ⇒ Object

Submit a reset job to the Platform

This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.



37
38
39
# File 'lib/pho/job.rb', line 37

def Jobs.submit_reset(store, label="Reset my store", t=Time.now)
  return submit_job(store, Pho::Jobs::RESET, label, t)
end

.submit_restore(store, snapshot_uri, label = "Restore my snapshot", t = Time.now) ⇒ Object

Submit a restore job to the Platform

This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.



67
68
69
# File 'lib/pho/job.rb', line 67

def Jobs.submit_restore(store, snapshot_uri, label="Restore my snapshot", t=Time.now)
  return submit_job(store, Pho::Jobs::RESTORE, label, t, snapshot_uri)
end

.submit_snapshot(store, label = "Snapshot my store", t = Time.now) ⇒ Object

Submit a snapshot job to the Platform

This method submits the job, and will return an HTTP:Message indicating the response from the Platform. Client code should check this for success. As job processing may not be immediate, clients should determine the URI of the newly created job and then monitor the jobs status if they need to wait for the job to finish.



57
58
59
# File 'lib/pho/job.rb', line 57

def Jobs.submit_snapshot(store, label="Snapshot my store", t=Time.now)
  return submit_job(store, Pho::Jobs::SNAPSHOT, label, t)
end

.wait_for(uri, store, interval = 1, &block) ⇒ Object

Wait for the specified job to finish

The method will repeatedly contact the Platform to determine whether the job has finished executing. The requests are made at configurable intervals (once a minute by default). If a block is supplied, then it is passed a reference to the Job (containing current progress updates) after each request. The Job object is returned once completed.

uri

URI of the job to wait for

store

the store on which the job is running

interval

the interval at which checks will be made, in minutes. Default is 1



118
119
120
121
122
123
124
125
126
127
128
# File 'lib/pho/job.rb', line 118

def Jobs.wait_for(uri, store, interval=1, &block)
    updates = 0
    job = Job.read_from_store(uri, store)
    updates = yield_job_update(job, updates, &block)
    while !job.completed?
      sleep interval*60
      job = Job.read_from_store(uri, store)
      updates = yield_job_update(job, updates, &block)
    end
    return job
end

.wait_for_submitted(resp, store, interval = 1, &block) ⇒ Object

Wait for a newly submitted job to finish



100
101
102
103
104
105
106
# File 'lib/pho/job.rb', line 100

def Jobs.(resp, store, interval=1, &block)
  if resp.status != 201
    raise "Unable to wait, job was not created. Status was #{resp.status}"
  end
  job_url = resp.header["Location"].first
  return wait_for(job_url, store, interval, &block)
end