Class: Gush::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/gush/client.rb

Constant Summary collapse

@@redis_connection =
Concurrent::ThreadLocalVar.new(nil)

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = Gush.configuration) ⇒ Client



20
21
22
# File 'lib/gush/client.rb', line 20

def initialize(config = Gush.configuration)
  @configuration = config
end

Instance Attribute Details

#configurationObject (readonly)

Returns the value of attribute configuration.



6
7
8
# File 'lib/gush/client.rb', line 6

def configuration
  @configuration
end

Class Method Details

.redis_connection(config) ⇒ Object



10
11
12
13
14
15
16
17
18
# File 'lib/gush/client.rb', line 10

def self.redis_connection(config)
  cached = (@@redis_connection.value ||= { url: config.redis_url, connection: nil })
  return cached[:connection] if !cached[:connection].nil? && config.redis_url == cached[:url]

  Redis.new(url: config.redis_url).tap do |instance|
    RedisClassy.redis = instance
    @@redis_connection.value = { url: config.redis_url, connection: instance }
  end
end

Instance Method Details

#all_workflowsObject

Deprecated.

This method is not performant when there are a large number of workflows or when the redis keyspace is large. Use workflows instead with pagination.



118
119
120
121
122
123
# File 'lib/gush/client.rb', line 118

def all_workflows
  redis.scan_each(match: "gush.workflows.*").map do |key|
    id = key.sub("gush.workflows.", "")
    find_workflow(id)
  end
end

#configure {|configuration| ... } ⇒ Object

Yields:



24
25
26
# File 'lib/gush/client.rb', line 24

def configure
  yield configuration
end

#create_workflow(name) ⇒ Object



28
29
30
31
32
33
34
35
# File 'lib/gush/client.rb', line 28

def create_workflow(name)
  begin
    name.constantize.create
  rescue NameError
    raise WorkflowNotFound.new("Workflow with given name doesn't exist")
  end
  flow
end

#destroy_job(workflow_id, job) ⇒ Object



193
194
195
196
# File 'lib/gush/client.rb', line 193

def destroy_job(workflow_id, job)
  redis.del("gush.jobs.#{workflow_id}.#{job.klass}")
  redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}")
end

#destroy_workflow(workflow) ⇒ Object



186
187
188
189
190
191
# File 'lib/gush/client.rb', line 186

def destroy_workflow(workflow)
  redis.del("gush.workflows.#{workflow.id}")
  redis.zrem("gush.idx.workflows.created_at", workflow.id)
  redis.zrem("gush.idx.workflows.expires_at", workflow.id)
  workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end

#enqueue_job(workflow_id, job) ⇒ Object



243
244
245
246
247
248
249
# File 'lib/gush/client.rb', line 243

def enqueue_job(workflow_id, job)
  job.enqueue!
  persist_job(workflow_id, job)

  options = { queue: configuration.namespace }.merge(job.worker_options)
  job.enqueue_worker!(options)
end

#expire_job(workflow_id, job, ttl = nil) ⇒ Object



233
234
235
236
237
238
239
240
241
# File 'lib/gush/client.rb', line 233

def expire_job(workflow_id, job, ttl=nil)
  ttl ||= configuration.ttl

  if ttl&.positive?
    redis.zadd("gush.idx.jobs.expires_at", Time.now.to_f + ttl, "#{workflow_id}.#{job.klass}")
  else
    redis.zrem("gush.idx.jobs.expires_at", "#{workflow_id}.#{job.klass}")
  end
end

#expire_jobs(expires_at = nil) ⇒ Object



211
212
213
214
215
216
217
218
219
# File 'lib/gush/client.rb', line 211

def expire_jobs(expires_at=nil)
  expires_at ||= Time.now.to_f

  keys = redis.zrange("gush.idx.jobs.expires_at", "-inf", expires_at, by_score: true)
  return if keys.empty?

  redis.del(keys.map { |key| "gush.jobs.#{key}" })
  redis.zrem("gush.idx.jobs.expires_at", keys)
end

#expire_workflow(workflow, ttl = nil) ⇒ Object



221
222
223
224
225
226
227
228
229
230
231
# File 'lib/gush/client.rb', line 221

def expire_workflow(workflow, ttl=nil)
  ttl ||= configuration.ttl

  if ttl&.positive?
    redis.zadd("gush.idx.workflows.expires_at", Time.now.to_f + ttl, workflow.id)
  else
    redis.zrem("gush.idx.workflows.expires_at", workflow.id)
  end

  workflow.jobs.each {|job| expire_job(workflow.id, job, ttl) }
end

#expire_workflows(expires_at = nil) ⇒ Object



198
199
200
201
202
203
204
205
206
207
208
209
# File 'lib/gush/client.rb', line 198

def expire_workflows(expires_at=nil)
  expires_at ||= Time.now.to_f

  ids = redis.zrange("gush.idx.workflows.expires_at", "-inf", expires_at, by_score: true)
  return if ids.empty?

  redis.del(ids.map { |id| "gush.workflows.#{id}" })
  redis.zrem("gush.idx.workflows.created_at", ids)
  redis.zrem("gush.idx.workflows.expires_at", ids)

  expire_jobs(expires_at)
end

#find_job(workflow_id, job_name) ⇒ Object



171
172
173
174
175
176
177
178
179
180
181
182
183
184
# File 'lib/gush/client.rb', line 171

def find_job(workflow_id, job_name)
  job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_name)

  data = if job_name_match
           find_job_by_klass_and_id(workflow_id, job_name)
         else
           find_job_by_klass(workflow_id, job_name)
         end

  return nil if data.nil?

  data = Gush::JSON.decode(data, symbolize_keys: true)
  Gush::Job.from_hash(data)
end

#find_workflow(id) ⇒ Object



125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/gush/client.rb', line 125

def find_workflow(id)
  data = redis.get("gush.workflows.#{id}")

  unless data.nil?
    hash = Gush::JSON.decode(data, symbolize_keys: true)

    if hash[:job_klasses]
      keys = hash[:job_klasses].map { |klass| "gush.jobs.#{id}.#{klass}" }
    else
      # For backwards compatibility, get job keys via a full keyspace scan
      keys = redis.scan_each(match: "gush.jobs.#{id}.*")
    end

    nodes = keys.each_with_object([]) do |key, array|
      array.concat(redis.hvals(key).map { |json| Gush::JSON.decode(json, symbolize_keys: true) })
    end

    workflow_from_hash(hash, nodes)
  else
    raise WorkflowNotFound.new("Workflow with given id doesn't exist")
  end
end

#next_free_job_id(workflow_id, job_klass) ⇒ Object



58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/gush/client.rb', line 58

def next_free_job_id(workflow_id, job_klass)
  job_id = nil

  loop do
    job_id = SecureRandom.uuid
    available = !redis.hexists("gush.jobs.#{workflow_id}.#{job_klass}", job_id)

    break if available
  end

  job_id
end

#next_free_workflow_idObject



71
72
73
74
75
76
77
78
79
80
81
# File 'lib/gush/client.rb', line 71

def next_free_workflow_id
  id = nil
  loop do
    id = SecureRandom.uuid
    available = !redis.exists?("gush.workflows.#{id}")

    break if available
  end

  id
end

#persist_job(workflow_id, job, expires_at: nil) ⇒ Object



165
166
167
168
169
# File 'lib/gush/client.rb', line 165

def persist_job(workflow_id, job, expires_at: nil)
  redis.zadd("gush.idx.jobs.expires_at", expires_at, "#{workflow_id}.#{job.klass}", nx: true) if expires_at

  redis.hset("gush.jobs.#{workflow_id}.#{job.klass}", job.id, job.to_json)
end

#persist_workflow(workflow) ⇒ Object



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/gush/client.rb', line 148

def persist_workflow(workflow)
  created_at = Time.now.to_f
  added = redis.zadd("gush.idx.workflows.created_at", created_at, workflow.id, nx: true)

  if added && configuration.ttl&.positive?
    expires_at = created_at + configuration.ttl
    redis.zadd("gush.idx.workflows.expires_at", expires_at, workflow.id, nx: true)
  end

  redis.set("gush.workflows.#{workflow.id}", workflow.to_json)

  workflow.jobs.each {|job| persist_job(workflow.id, job, expires_at: expires_at) }
  workflow.mark_as_persisted

  true
end

#start_workflow(workflow, job_names = []) ⇒ Object



37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/gush/client.rb', line 37

def start_workflow(workflow, job_names = [])
  workflow.mark_as_started
  persist_workflow(workflow)

  jobs = if job_names.empty?
           workflow.initial_jobs
         else
           job_names.map {|name| workflow.find_job(name) }
         end

  jobs.each do |job|
    enqueue_job(workflow.id, job)
  end
end

#stop_workflow(id) ⇒ Object



52
53
54
55
56
# File 'lib/gush/client.rb', line 52

def stop_workflow(id)
  workflow = find_workflow(id)
  workflow.mark_as_stopped
  persist_workflow(workflow)
end

#workflow_ids(start = nil, stop = nil, by_ts: false, order: :asc) ⇒ Object

Returns the specified range of workflow ids, sorted by created timestamp.



93
94
95
96
97
98
99
100
101
102
103
104
# File 'lib/gush/client.rb', line 93

def workflow_ids(start=nil, stop=nil, by_ts: false, order: :asc)
  start ||= 0
  stop ||= 99

  redis.zrange(
    "gush.idx.workflows.created_at",
    start,
    stop,
    by_score: by_ts,
    rev: order&.to_sym == :desc
  )
end

#workflows(start = nil, stop = nil, **kwargs) ⇒ Object



106
107
108
# File 'lib/gush/client.rb', line 106

def workflows(start=nil, stop=nil, **kwargs)
  workflow_ids(start, stop, **kwargs).map { |id| find_workflow(id) }
end

#workflows_countObject



110
111
112
# File 'lib/gush/client.rb', line 110

def workflows_count
  redis.zcard('gush.idx.workflows.created_at')
end