Class: Gush::Client
- Inherits:
-
Object
- Object
- Gush::Client
- Defined in:
- lib/gush/client.rb
Instance Attribute Summary collapse
-
#configuration ⇒ Object
readonly
Returns the value of attribute configuration.
-
#sidekiq ⇒ Object
readonly
Returns the value of attribute sidekiq.
Instance Method Summary collapse
- #all_workflows ⇒ Object
- #configure {|configuration| ... } ⇒ Object
- #create_workflow(name) ⇒ Object
- #destroy_job(workflow_id, job) ⇒ Object
- #destroy_workflow(workflow) ⇒ Object
- #enqueue_job(workflow_id, job) ⇒ Object
- #find_workflow(id) ⇒ Object
-
#initialize(config = Gush.configuration) ⇒ Client
constructor
A new instance of Client.
- #load_job(workflow_id, job_id) ⇒ Object
- #next_free_job_id(workflow_id, job_klass) ⇒ Object
- #next_free_workflow_id ⇒ Object
- #persist_job(workflow_id, job) ⇒ Object
- #persist_workflow(workflow) ⇒ Object
- #start_workflow(workflow, job_names = []) ⇒ Object
- #stop_workflow(id) ⇒ Object
- #worker_report(message) ⇒ Object
- #workflow_report(message) ⇒ Object
Constructor Details
#initialize(config = Gush.configuration) ⇒ Client
Returns a new instance of Client.
5 6 7 8 |
# File 'lib/gush/client.rb', line 5 def initialize(config = Gush.configuration) @configuration = config @sidekiq = build_sidekiq end |
Instance Attribute Details
#configuration ⇒ Object (readonly)
Returns the value of attribute configuration.
3 4 5 |
# File 'lib/gush/client.rb', line 3 def configuration @configuration end |
#sidekiq ⇒ Object (readonly)
Returns the value of attribute sidekiq.
3 4 5 |
# File 'lib/gush/client.rb', line 3 def sidekiq @sidekiq end |
Instance Method Details
#all_workflows ⇒ Object
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/gush/client.rb', line 76 def all_workflows connection_pool.with do |redis| #redis.keys("gush.workflows.*").map do |key| redis.keys(build_redis_key("gush.workflows.*")).map do |key| #id = key.sub("gush.workflows.", "") id = key.sub(build_redis_key("gush.workflows."), "") find_workflow(id) end end end |
#configure {|configuration| ... } ⇒ Object
10 11 12 13 |
# File 'lib/gush/client.rb', line 10 def configure yield configuration @sidekiq = build_sidekiq end |
#create_workflow(name) ⇒ Object
15 16 17 18 19 20 21 22 |
# File 'lib/gush/client.rb', line 15 def create_workflow(name) begin name.constantize.create rescue NameError raise WorkflowNotFound.new("Workflow with given name doesn't exist") end flow end |
#destroy_job(workflow_id, job) ⇒ Object
151 152 153 154 155 |
# File 'lib/gush/client.rb', line 151 def destroy_job(workflow_id, job) connection_pool.with do |redis| redis.del(build_redis_key("gush.jobs.#{workflow_id}.#{job.name}")) end end |
#destroy_workflow(workflow) ⇒ Object
144 145 146 147 148 149 |
# File 'lib/gush/client.rb', line 144 def destroy_workflow(workflow) connection_pool.with do |redis| redis.del(build_redis_key("gush.workflows.#{workflow.id}")) end workflow.jobs.each {|job| destroy_job(workflow.id, job) } end |
#enqueue_job(workflow_id, job) ⇒ Object
165 166 167 168 169 170 171 172 173 174 175 |
# File 'lib/gush/client.rb', line 165 def enqueue_job(workflow_id, job) job.enqueue! persist_job(workflow_id, job) sidekiq.push( 'class' => Gush::Worker, #'queue' => configuration.namespace, 'queue' => configuration.sidekiq_queue, 'args' => [workflow_id, job.name] ) end |
#find_workflow(id) ⇒ Object
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
# File 'lib/gush/client.rb', line 87 def find_workflow(id) connection_pool.with do |redis| #data = redis.get("gush.workflows.#{id}") data = redis.get(build_redis_key("gush.workflows.#{id}")) unless data.nil? hash = Gush::JSON.decode(data, symbolize_keys: true) #keys = redis.keys("gush.jobs.#{id}.*") keys = redis.keys(build_redis_key("gush.jobs.#{id}.*")) nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } workflow_from_hash(hash, nodes) else raise WorkflowNotFound.new("Workflow with given id doesn't exist") end end end |
#load_job(workflow_id, job_id) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
# File 'lib/gush/client.rb', line 122 def load_job(workflow_id, job_id) workflow = find_workflow(workflow_id) job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_id) hypen = '-' if job_name_match.nil? keys = connection_pool.with do |redis| #redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*") redis.keys(build_redis_key("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*")) end return nil if keys.nil? data = connection_pool.with do |redis| redis.get(keys.first) end return nil if data.nil? data = Gush::JSON.decode(data, symbolize_keys: true) Gush::Job.from_hash(workflow, data) end |
#next_free_job_id(workflow_id, job_klass) ⇒ Object
45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# File 'lib/gush/client.rb', line 45 def next_free_job_id(workflow_id,job_klass) job_identifier = nil loop do id = SecureRandom.uuid job_identifier = "#{job_klass}-#{id}" available = connection_pool.with do |redis| !redis.exists(build_redis_key("gush.jobs.#{workflow_id}.#{job_identifier}")) #!redis.exists("gush.jobs.#{workflow_id}.#{job_identifier}") end break if available end job_identifier end |
#next_free_workflow_id ⇒ Object
61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/gush/client.rb', line 61 def next_free_workflow_id id = nil loop do id = SecureRandom.uuid available = connection_pool.with do |redis| !redis.exists(build_redis_key("gush.workflow.#{id}")) #!redis.exists("gush.workflow.#{id}") end break if available end id end |
#persist_job(workflow_id, job) ⇒ Object
115 116 117 118 119 120 |
# File 'lib/gush/client.rb', line 115 def persist_job(workflow_id, job) connection_pool.with do |redis| #redis.set("gush.jobs.#{workflow_id}.#{job.name}", job.to_json) redis.set(build_redis_key("gush.jobs.#{workflow_id}.#{job.name}"), job.to_json) end end |
#persist_workflow(workflow) ⇒ Object
104 105 106 107 108 109 110 111 112 113 |
# File 'lib/gush/client.rb', line 104 def persist_workflow(workflow) connection_pool.with do |redis| #redis.set("gush.workflows.#{workflow.id}", workflow.to_json) redis.set(build_redis_key("gush.workflows.#{workflow.id}"), workflow.to_json) end workflow.jobs.each {|job| persist_job(workflow.id, job) } workflow.mark_as_persisted true end |
#start_workflow(workflow, job_names = []) ⇒ Object
24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
# File 'lib/gush/client.rb', line 24 def start_workflow(workflow, job_names = []) workflow.mark_as_started persist_workflow(workflow) jobs = if job_names.empty? workflow.initial_jobs else job_names.map {|name| workflow.find_job(name) } end jobs.each do |job| enqueue_job(workflow.id, job) end end |
#stop_workflow(id) ⇒ Object
39 40 41 42 43 |
# File 'lib/gush/client.rb', line 39 def stop_workflow(id) workflow = find_workflow(id) workflow.mark_as_stopped persist_workflow(workflow) end |
#worker_report(message) ⇒ Object
157 158 159 |
# File 'lib/gush/client.rb', line 157 def worker_report() report("gush.workers.status", ) end |
#workflow_report(message) ⇒ Object
161 162 163 |
# File 'lib/gush/client.rb', line 161 def workflow_report() report("gush.workflows.status", ) end |