Class: Gush::Client
- Inherits:
-
Object
- Object
- Gush::Client
- Defined in:
- lib/gush/client.rb
Constant Summary collapse
- @@redis_connection =
Concurrent::ThreadLocalVar.new(nil)
Instance Attribute Summary collapse
-
#configuration ⇒ Object
readonly
Returns the value of attribute configuration.
Class Method Summary collapse
Instance Method Summary collapse
-
#all_workflows ⇒ Object
Deprecated.
- #configure {|configuration| ... } ⇒ Object
- #create_workflow(name) ⇒ Object
- #destroy_job(workflow_id, job) ⇒ Object
- #destroy_workflow(workflow) ⇒ Object
- #enqueue_job(workflow_id, job) ⇒ Object
- #expire_job(workflow_id, job, ttl = nil) ⇒ Object
- #expire_jobs(expires_at = nil) ⇒ Object
- #expire_workflow(workflow, ttl = nil) ⇒ Object
- #expire_workflows(expires_at = nil) ⇒ Object
- #find_job(workflow_id, job_name) ⇒ Object
- #find_workflow(id) ⇒ Object
-
#initialize(config = Gush.configuration) ⇒ Client
constructor
A new instance of Client.
- #next_free_job_id(workflow_id, job_klass) ⇒ Object
- #next_free_workflow_id ⇒ Object
- #persist_job(workflow_id, job, expires_at: nil) ⇒ Object
- #persist_workflow(workflow) ⇒ Object
- #start_workflow(workflow, job_names = []) ⇒ Object
- #stop_workflow(id) ⇒ Object
-
#workflow_ids(start = nil, stop = nil, by_ts: false, order: :asc) ⇒ Object
Returns the specified range of workflow ids, sorted by created timestamp.
- #workflows(start = nil, stop = nil, **kwargs) ⇒ Object
- #workflows_count ⇒ Object
Constructor Details
#initialize(config = Gush.configuration) ⇒ Client
20 21 22 |
# File 'lib/gush/client.rb', line 20 def initialize(config = Gush.configuration) @configuration = config end |
Instance Attribute Details
#configuration ⇒ Object (readonly)
Returns the value of attribute configuration.
6 7 8 |
# File 'lib/gush/client.rb', line 6 def configuration @configuration end |
Class Method Details
.redis_connection(config) ⇒ Object
10 11 12 13 14 15 16 17 18 |
# File 'lib/gush/client.rb', line 10 def self.redis_connection(config) cached = (@@redis_connection.value ||= { url: config.redis_url, connection: nil }) return cached[:connection] if !cached[:connection].nil? && config.redis_url == cached[:url] Redis.new(url: config.redis_url).tap do |instance| RedisClassy.redis = instance @@redis_connection.value = { url: config.redis_url, connection: instance } end end |
Instance Method Details
#all_workflows ⇒ Object
Deprecated.
This method is not performant when there are a large number of workflows or when the redis keyspace is large. Use workflows instead with pagination.
118 119 120 121 122 123 |
# File 'lib/gush/client.rb', line 118 def all_workflows redis.scan_each(match: "gush.workflows.*").map do |key| id = key.sub("gush.workflows.", "") find_workflow(id) end end |
#configure {|configuration| ... } ⇒ Object
24 25 26 |
# File 'lib/gush/client.rb', line 24 def configure yield configuration end |
#create_workflow(name) ⇒ Object
28 29 30 31 32 33 34 35 |
# File 'lib/gush/client.rb', line 28 def create_workflow(name) begin name.constantize.create rescue NameError raise WorkflowNotFound.new("Workflow with given name doesn't exist") end flow end |
#destroy_job(workflow_id, job) ⇒ Object
193 194 195 196 |
# File 'lib/gush/client.rb', line 193 def destroy_job(workflow_id, job) redis.del("gush.jobs.#{workflow_id}.#{job.klass}") redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}") end |
#destroy_workflow(workflow) ⇒ Object
186 187 188 189 190 191 |
# File 'lib/gush/client.rb', line 186 def destroy_workflow(workflow) redis.del("gush.workflows.#{workflow.id}") redis.zrem("gush.idx.workflows.created_at", workflow.id) redis.zrem("gush.idx.workflows.expires_at", workflow.id) workflow.jobs.each {|job| destroy_job(workflow.id, job) } end |
#enqueue_job(workflow_id, job) ⇒ Object
243 244 245 246 247 248 249 |
# File 'lib/gush/client.rb', line 243 def enqueue_job(workflow_id, job) job.enqueue! persist_job(workflow_id, job) = { queue: configuration.namespace }.merge(job.) job.enqueue_worker!() end |
#expire_job(workflow_id, job, ttl = nil) ⇒ Object
233 234 235 236 237 238 239 240 241 |
# File 'lib/gush/client.rb', line 233 def expire_job(workflow_id, job, ttl=nil) ttl ||= configuration.ttl if ttl&.positive? redis.zadd("gush.idx.jobs.expires_at", Time.now.to_f + ttl, "#{workflow_id}.#{job.klass}") else redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}") end end |
#expire_jobs(expires_at = nil) ⇒ Object
211 212 213 214 215 216 217 218 219 |
# File 'lib/gush/client.rb', line 211 def expire_jobs(expires_at=nil) expires_at ||= Time.now.to_f keys = redis.zrange("gush.idx.jobs.expires_at", "-inf", expires_at, by_score: true) return if keys.empty? redis.del(keys.map { |key| "gush.jobs.#{key}" }) redis.zrem("gush.idx.jobs.expires_at", keys) end |
#expire_workflow(workflow, ttl = nil) ⇒ Object
221 222 223 224 225 226 227 228 229 230 231 |
# File 'lib/gush/client.rb', line 221 def expire_workflow(workflow, ttl=nil) ttl ||= configuration.ttl if ttl&.positive? redis.zadd("gush.idx.workflows.expires_at", Time.now.to_f + ttl, workflow.id) else redis.zrem("gush.idx.workflows.expires_at", workflow.id) end workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) } end |
#expire_workflows(expires_at = nil) ⇒ Object
198 199 200 201 202 203 204 205 206 207 208 209 |
# File 'lib/gush/client.rb', line 198 def expire_workflows(expires_at=nil) expires_at ||= Time.now.to_f ids = redis.zrange("gush.idx.workflows.expires_at", "-inf", expires_at, by_score: true) return if ids.empty? redis.del(ids.map { |id| "gush.workflows.#{id}" }) redis.zrem("gush.idx.workflows.created_at", ids) redis.zrem("gush.idx.workflows.expires_at", ids) expire_jobs(expires_at) end |
#find_job(workflow_id, job_name) ⇒ Object
171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
# File 'lib/gush/client.rb', line 171 def find_job(workflow_id, job_name) job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_name) data = if job_name_match find_job_by_klass_and_id(workflow_id, job_name) else find_job_by_klass(workflow_id, job_name) end return nil if data.nil? data = Gush::JSON.decode(data, symbolize_keys: true) Gush::Job.from_hash(data) end |
#find_workflow(id) ⇒ Object
125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
# File 'lib/gush/client.rb', line 125 def find_workflow(id) data = redis.get("gush.workflows.#{id}") unless data.nil? hash = Gush::JSON.decode(data, symbolize_keys: true) if hash[:job_klasses] keys = hash[:job_klasses].map { |klass| "gush.jobs.#{id}.#{klass}" } else # For backwards compatibility, get job keys via a full keyspace scan keys = redis.scan_each(match: "gush.jobs.#{id}.*") end nodes = keys.each_with_object([]) do |key, array| array.concat(redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }) end workflow_from_hash(hash, nodes) else raise WorkflowNotFound.new("Workflow with given id doesn't exist") end end |
#next_free_job_id(workflow_id, job_klass) ⇒ Object
58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/gush/client.rb', line 58 def next_free_job_id(workflow_id, job_klass) job_id = nil loop do job_id = SecureRandom.uuid available = !redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id) break if available end job_id end |
#next_free_workflow_id ⇒ Object
71 72 73 74 75 76 77 78 79 80 81 |
# File 'lib/gush/client.rb', line 71 def next_free_workflow_id id = nil loop do id = SecureRandom.uuid available = !redis.exists?("gush.workflows.#{id}") break if available end id end |
#persist_job(workflow_id, job, expires_at: nil) ⇒ Object
165 166 167 168 169 |
# File 'lib/gush/client.rb', line 165 def persist_job(workflow_id, job, expires_at: nil) redis.zadd("gush.idx.jobs.expires_at", expires_at, "#{workflow_id}.#{job.klass}", nx: true) if expires_at redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json) end |
#persist_workflow(workflow) ⇒ Object
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 |
# File 'lib/gush/client.rb', line 148 def persist_workflow(workflow) created_at = Time.now.to_f added = redis.zadd("gush.idx.workflows.created_at", created_at, workflow.id, nx: true) if added && configuration.ttl&.positive? expires_at = created_at + configuration.ttl redis.zadd("gush.idx.workflows.expires_at", expires_at, workflow.id, nx: true) end redis.set("gush.workflows.#{workflow.id}", workflow.to_json) workflow.jobs.each {|job| persist_job(workflow.id, job, expires_at: expires_at) } workflow.mark_as_persisted true end |
#start_workflow(workflow, job_names = []) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/gush/client.rb', line 37 def start_workflow(workflow, job_names = []) workflow.mark_as_started persist_workflow(workflow) jobs = if job_names.empty? workflow.initial_jobs else job_names.map {|name| workflow.find_job(name) } end jobs.each do |job| enqueue_job(workflow.id, job) end end |
#stop_workflow(id) ⇒ Object
52 53 54 55 56 |
# File 'lib/gush/client.rb', line 52 def stop_workflow(id) workflow = find_workflow(id) workflow.mark_as_stopped persist_workflow(workflow) end |
#workflow_ids(start = nil, stop = nil, by_ts: false, order: :asc) ⇒ Object
Returns the specified range of workflow ids, sorted by created timestamp.
93 94 95 96 97 98 99 100 101 102 103 104 |
# File 'lib/gush/client.rb', line 93 def workflow_ids(start=nil, stop=nil, by_ts: false, order: :asc) start ||= 0 stop ||= 99 redis.zrange( "gush.idx.workflows.created_at", start, stop, by_score: by_ts, rev: order&.to_sym == :desc ) end |
#workflows(start = nil, stop = nil, **kwargs) ⇒ Object
106 107 108 |
# File 'lib/gush/client.rb', line 106 def workflows(start=nil, stop=nil, **kwargs) workflow_ids(start, stop, **kwargs).map { |id| find_workflow(id) } end |
#workflows_count ⇒ Object
110 111 112 |
# File 'lib/gush/client.rb', line 110 def workflows_count redis.zcard('gush.idx.workflows.created_at') end |