Class: Gush::Client

Inherits:
Object
  • Object
show all
Defined in:
lib/gush/client.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config = Gush.configuration) ⇒ Client

Returns a new instance of Client.



5
6
7
8
9
# File 'lib/gush/client.rb', line 5

def initialize(config = Gush.configuration)
  @configuration = config
  @sidekiq = build_sidekiq
  @redis = build_redis
end

Instance Attribute Details

#configurationObject (readonly)

Returns the value of attribute configuration.



3
4
5
# File 'lib/gush/client.rb', line 3

def configuration
  @configuration
end

Instance Method Details

#all_workflowsObject



68
69
70
71
72
73
# File 'lib/gush/client.rb', line 68

def all_workflows
  redis.keys("gush.workflows.*").map do |key|
    id = key.sub("gush.workflows.", "")
    find_workflow(id)
  end
end

#configure {|configuration| ... } ⇒ Object

Yields:



11
12
13
14
15
# File 'lib/gush/client.rb', line 11

def configure
  yield configuration
  @sidekiq = build_sidekiq
  @redis = build_redis
end

#create_workflow(name) ⇒ Object



17
18
19
20
21
22
23
24
# File 'lib/gush/client.rb', line 17

def create_workflow(name)
  begin
    name.constantize.create
  rescue NameError
    raise WorkflowNotFound.new("Workflow with given name doesn't exist")
  end
  flow
end

#destroy_job(workflow_id, job) ⇒ Object



118
119
120
# File 'lib/gush/client.rb', line 118

def destroy_job(workflow_id, job)
  redis.del("gush.jobs.#{workflow_id}.#{job.name}")
end

#destroy_workflow(workflow) ⇒ Object



113
114
115
116
# File 'lib/gush/client.rb', line 113

def destroy_workflow(workflow)
  redis.del("gush.workflows.#{workflow.id}")
  workflow.jobs.each {|job| destroy_job(workflow.id, job) }
end

#enqueue_job(workflow_id, job) ⇒ Object



130
131
132
133
134
135
136
137
138
139
# File 'lib/gush/client.rb', line 130

def enqueue_job(workflow_id, job)
  job.enqueue!
  persist_job(workflow_id, job)

  sidekiq.push(
    'class' => Gush::Worker,
    'queue' => configuration.namespace,
    'args'  => [workflow_id, job.name]
  )
end

#find_workflow(id) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
# File 'lib/gush/client.rb', line 75

def find_workflow(id)
  data = redis.get("gush.workflows.#{id}")
  unless data.nil?
    hash = Gush::JSON.decode(data, symbolize_keys: true)
    keys = redis.keys("gush.jobs.#{id}.*")
    nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) }
    workflow_from_hash(hash, nodes)
  else
    raise WorkflowNotFound.new("Workflow with given id doesn't exist")
  end
end

#load_job(workflow_id, job_id) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/gush/client.rb', line 98

def load_job(workflow_id, job_id)
  workflow = find_workflow(workflow_id)
  job_name_match = /(?<klass>\w*[^-])-(?<identifier>.*)/.match(job_id)
  hypen = '-' if job_name_match.nil?

  keys = redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*")
  return nil if keys.nil?

  data = redis.get(keys.first)
  return nil if data.nil?

  data = Gush::JSON.decode(data, symbolize_keys: true)
  Gush::Job.from_hash(workflow, data)
end

#next_free_job_id(workflow_id, job_klass) ⇒ Object



47
48
49
50
51
52
53
54
55
56
# File 'lib/gush/client.rb', line 47

def next_free_job_id(workflow_id,job_klass)
  job_identifier = nil
  loop do
    id = SecureRandom.uuid
    job_identifier = "#{job_klass}-#{id}"
    break if !redis.exists("gush.jobs.#{workflow_id}.#{job_identifier}")
  end

  job_identifier
end

#next_free_workflow_idObject



58
59
60
61
62
63
64
65
66
# File 'lib/gush/client.rb', line 58

def next_free_workflow_id
  id = nil
  loop do
    id = SecureRandom.uuid
    break if !redis.exists("gush.workflow.#{id}")
  end

  id
end

#persist_job(workflow_id, job) ⇒ Object



94
95
96
# File 'lib/gush/client.rb', line 94

def persist_job(workflow_id, job)
  redis.set("gush.jobs.#{workflow_id}.#{job.name}", job.to_json)
end

#persist_workflow(workflow) ⇒ Object



87
88
89
90
91
92
# File 'lib/gush/client.rb', line 87

def persist_workflow(workflow)
  redis.set("gush.workflows.#{workflow.id}", workflow.to_json)
  workflow.jobs.each {|job| persist_job(workflow.id, job) }
  workflow.mark_as_persisted
  true
end

#start_workflow(workflow, job_names = []) ⇒ Object



26
27
28
29
30
31
32
33
34
35
36
37
38
39
# File 'lib/gush/client.rb', line 26

def start_workflow(workflow, job_names = [])
  workflow.mark_as_started
  persist_workflow(workflow)

  jobs = if job_names.empty?
           workflow.initial_jobs
         else
           job_names.map {|name| workflow.find_job(name) }
         end

  jobs.each do |job|
    enqueue_job(workflow.id, job)
  end
end

#stop_workflow(id) ⇒ Object



41
42
43
44
45
# File 'lib/gush/client.rb', line 41

def stop_workflow(id)
  workflow = find_workflow(id)
  workflow.mark_as_stopped
  persist_workflow(workflow)
end

#worker_report(message) ⇒ Object



122
123
124
# File 'lib/gush/client.rb', line 122

def worker_report(message)
  report("gush.workers.status", message)
end

#workflow_report(message) ⇒ Object



126
127
128
# File 'lib/gush/client.rb', line 126

def workflow_report(message)
  report("gush.workflows.status", message)
end