Class: Distribot::PhaseStartedHandler

Inherits:
Object
  • Object
show all
Includes:
Handler
Defined in:
lib/distribot/phase_started_handler.rb

Instance Attribute Summary

Attributes included from Handler

#consumers, #queue_name

Instance Method Summary collapse

Methods included from Handler

handler_for, included, #initialize, queue_for

Instance Method Details

#best_version(handler) ⇒ Object



45
46
47
48
49
50
51
52
53
54
55
56
57
# File 'lib/distribot/phase_started_handler.rb', line 45

def best_version(handler)
  if handler.version
    wanted_version = Gem::Dependency.new(handler.to_s, handler.version)
    # Figure out the highest acceptable version of the handler we can assign work to:
    handler_versions(handler.to_s)
      .reverse
      .find { |x| wanted_version.match?(handler.to_s, x.to_s) }
      .to_s
  else
    # Find the highest version for this queue:
    handler_versions(handler.to_s).last
  end
end

#callback(message) ⇒ Object



8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# File 'lib/distribot/phase_started_handler.rb', line 8

def callback(message)
  flow = Distribot::Flow.find(message[:flow_id])
  phase = flow.phase(message[:phase])
  if phase.handlers.empty?
    Distribot.publish!(
      'distribot.flow.phase.finished',
      flow_id: flow.id,
      phase: phase.name
    )
  else
    handler_versions = phase.handlers.map do |handler|
      version = best_version(handler)
      unless version && !version.blank?
        fail "Cannot find a good #{handler} version #{handler.version}"
      end
      {
        handler.to_s => version
      }
    end.reduce({}, :merge)
    phase.handlers.each do |handler|
      init_handler(flow, phase, handler, handler_versions[handler.to_s])
    end
  end
end

#handler_versions(handler) ⇒ Object



59
60
61
62
63
64
65
66
67
# File 'lib/distribot/phase_started_handler.rb', line 59

def handler_versions(handler)
  queue_prefix = "distribot.flow.handler.#{handler}."
  Distribot.connector.queues
    .select { |x| x.start_with? queue_prefix }
    .reject { |x| x.end_with? '.enumerate' }
    .map { |x| x.gsub(/^#{queue_prefix}/, '').gsub(/\.tasks$/, '') }
    .map { |x| Semantic::Version.new x }
    .sort
end

#init_handler(flow, phase, handler, version) ⇒ Object

rubocop:disable Metrics/LineLength



34
35
36
37
38
39
40
41
42
43
# File 'lib/distribot/phase_started_handler.rb', line 34

def init_handler(flow, phase, handler, version)
  Distribot.publish!(
    "distribot.flow.handler.#{handler}.#{version}.enumerate",
    flow_id: flow.id,
    phase: phase.name,
    task_queue: "distribot.flow.handler.#{handler}.#{version}.tasks",
    task_counter: "distribot.flow.#{flow.id}.#{phase.name}.#{handler}.finished",
    finished_queue: 'distribot.flow.task.finished'
  )
end