Class: Gush::Client
- Inherits:
-
Object
- Object
- Gush::Client
- Defined in:
- lib/gush/client.rb
Instance Attribute Summary collapse
-
#configuration ⇒ Object
readonly
Returns the value of attribute configuration.
Instance Method Summary collapse
- #all_workflows ⇒ Object
- #configure {|configuration| ... } ⇒ Object
- #create_workflow(name) ⇒ Object
- #destroy_job(workflow_id, job) ⇒ Object
- #destroy_workflow(workflow) ⇒ Object
- #enqueue_job(workflow_id, job) ⇒ Object
- #find_workflow(id) ⇒ Object
-
#initialize(config = Gush.configuration) ⇒ Client
constructor
A new instance of Client.
- #load_job(workflow_id, job_id) ⇒ Object
- #next_free_job_id(workflow_id, job_klass) ⇒ Object
- #next_free_workflow_id ⇒ Object
- #persist_job(workflow_id, job) ⇒ Object
- #persist_workflow(workflow) ⇒ Object
- #start_workflow(workflow, job_names = []) ⇒ Object
- #stop_workflow(id) ⇒ Object
- #worker_report(message) ⇒ Object
- #workflow_report(message) ⇒ Object
Constructor Details
#initialize(config = Gush.configuration) ⇒ Client
Returns a new instance of Client.
5 6 7 8 9 |
# File 'lib/gush/client.rb', line 5 def initialize(config = Gush.configuration) @configuration = config @sidekiq = build_sidekiq @redis = build_redis end |
Instance Attribute Details
#configuration ⇒ Object (readonly)
Returns the value of attribute configuration.
3 4 5 |
# File 'lib/gush/client.rb', line 3 def configuration @configuration end |
Instance Method Details
#all_workflows ⇒ Object
68 69 70 71 72 73 |
# File 'lib/gush/client.rb', line 68 def all_workflows redis.keys("gush.workflows.*").map do |key| id = key.sub("gush.workflows.", "") find_workflow(id) end end |
#configure {|configuration| ... } ⇒ Object
11 12 13 14 15 |
# File 'lib/gush/client.rb', line 11 def configure yield configuration @sidekiq = build_sidekiq @redis = build_redis end |
#create_workflow(name) ⇒ Object
17 18 19 20 21 22 23 24 |
# File 'lib/gush/client.rb', line 17 def create_workflow(name) begin name.constantize.create rescue NameError raise WorkflowNotFound.new("Workflow with given name doesn't exist") end flow end |
#destroy_job(workflow_id, job) ⇒ Object
118 119 120 |
# File 'lib/gush/client.rb', line 118 def destroy_job(workflow_id, job) redis.del("gush.jobs.#{workflow_id}.#{job.name}") end |
#destroy_workflow(workflow) ⇒ Object
113 114 115 116 |
# File 'lib/gush/client.rb', line 113 def destroy_workflow(workflow) redis.del("gush.workflows.#{workflow.id}") workflow.jobs.each {|job| destroy_job(workflow.id, job) } end |
#enqueue_job(workflow_id, job) ⇒ Object
130 131 132 133 134 135 136 137 138 139 |
# File 'lib/gush/client.rb', line 130 def enqueue_job(workflow_id, job) job.enqueue! persist_job(workflow_id, job) sidekiq.push( 'class' => Gush::Worker, 'queue' => configuration.namespace, 'args' => [workflow_id, job.name] ) end |
#find_workflow(id) ⇒ Object
75 76 77 78 79 80 81 82 83 84 85 |
# File 'lib/gush/client.rb', line 75 def find_workflow(id) data = redis.get("gush.workflows.#{id}") unless data.nil? hash = Gush::JSON.decode(data, symbolize_keys: true) keys = redis.keys("gush.jobs.#{id}.*") nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } workflow_from_hash(hash, nodes) else raise WorkflowNotFound.new("Workflow with given id doesn't exist") end end |
#load_job(workflow_id, job_id) ⇒ Object
98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/gush/client.rb', line 98 def load_job(workflow_id, job_id) workflow = find_workflow(workflow_id) job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_id) hypen = '-' if job_name_match.nil? keys = redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*") return nil if keys.nil? data = redis.get(keys.first) return nil if data.nil? data = Gush::JSON.decode(data, symbolize_keys: true) Gush::Job.from_hash(workflow, data) end |
#next_free_job_id(workflow_id, job_klass) ⇒ Object
47 48 49 50 51 52 53 54 55 56 |
# File 'lib/gush/client.rb', line 47 def next_free_job_id(workflow_id,job_klass) job_identifier = nil loop do id = SecureRandom.uuid job_identifier = "#{job_klass}-#{id}" break if !redis.exists("gush.jobs.#{workflow_id}.#{job_identifier}") end job_identifier end |
#next_free_workflow_id ⇒ Object
58 59 60 61 62 63 64 65 66 |
# File 'lib/gush/client.rb', line 58 def next_free_workflow_id id = nil loop do id = SecureRandom.uuid break if !redis.exists("gush.workflow.#{id}") end id end |
#persist_job(workflow_id, job) ⇒ Object
94 95 96 |
# File 'lib/gush/client.rb', line 94 def persist_job(workflow_id, job) redis.set("gush.jobs.#{workflow_id}.#{job.name}", job.to_json) end |
#persist_workflow(workflow) ⇒ Object
87 88 89 90 91 92 |
# File 'lib/gush/client.rb', line 87 def persist_workflow(workflow) redis.set("gush.workflows.#{workflow.id}", workflow.to_json) workflow.jobs.each {|job| persist_job(workflow.id, job) } workflow.mark_as_persisted true end |
#start_workflow(workflow, job_names = []) ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
# File 'lib/gush/client.rb', line 26 def start_workflow(workflow, job_names = []) workflow.mark_as_started persist_workflow(workflow) jobs = if job_names.empty? workflow.initial_jobs else job_names.map {|name| workflow.find_job(name) } end jobs.each do |job| enqueue_job(workflow.id, job) end end |
#stop_workflow(id) ⇒ Object
41 42 43 44 45 |
# File 'lib/gush/client.rb', line 41 def stop_workflow(id) workflow = find_workflow(id) workflow.mark_as_stopped persist_workflow(workflow) end |
#worker_report(message) ⇒ Object
122 123 124 |
# File 'lib/gush/client.rb', line 122 def worker_report() report("gush.workers.status", ) end |
#workflow_report(message) ⇒ Object
126 127 128 |
# File 'lib/gush/client.rb', line 126 def workflow_report() report("gush.workflows.status", ) end |