Module: Distribot::Worker

Defined in:
lib/distribot/worker.rb

Defined Under Namespace

Modules: ClassMethods

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#task_consumersObject

Returns the value of attribute task_consumers.



43
44
45
# File 'lib/distribot/worker.rb', line 43

def task_consumers
  @task_consumers
end

Class Method Details

.included(base) ⇒ Object



6
7
8
# File 'lib/distribot/worker.rb', line 6

def self.included(base)
  base.extend(ClassMethods)
end

Instance Method Details

#enumerate_tasks(message) ⇒ Object



112
113
114
115
116
117
118
119
# File 'lib/distribot/worker.rb', line 112

def enumerate_tasks(message)
  trycatch do
    context = OpenStruct.new(message)
    flow = Distribot::Flow.find(context.flow_id)
    fail FlowCanceledError if flow.canceled?
    send(self.class.enumerator, context)
  end
end

#handle_task_execution(task) ⇒ Object



87
88
89
90
91
92
93
94
95
96
# File 'lib/distribot/worker.rb', line 87

def handle_task_execution(task)
  context = OpenStruct.new(
    flow_id: task[:flow_id],
    phase: task[:phase],
    finished_queue: 'distribot.flow.task.finished'
  )
  trycatch do
    process_single_task(context, task)
  end
end

#loggerObject



51
52
53
# File 'lib/distribot/worker.rb', line 51

def logger
  Distribot.logger
end

#prepare_for_enumerationObject



55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# File 'lib/distribot/worker.rb', line 55

def prepare_for_enumeration
  logger.tagged("handler:#{self.class}") do
    pool = Concurrent::FixedThreadPool.new(5)
    Distribot.subscribe(self.class.enumeration_queue, solo: true) do |message|
      pool.post do
        logger.tagged(message.map { |k, v| [k, v].join(':') }) do
          context = OpenStruct.new(message)
          trycatch do
            tasks = enumerate_tasks(message)
            announce_tasks(context, message, tasks)
          end
        end
      end
    end
  end
end

#process_single_task(context, task) ⇒ Object



98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/distribot/worker.rb', line 98

def process_single_task(context, task)
  inspect_task!(context)
  # Your code is called right here:
  send(self.class.processor, context, task)
  task_counter_key = "distribot.flow.#{context.flow_id}.#{context.phase}.#{self.class}.finished"
  Distribot.redis.decr(task_counter_key)
  publish_args = {
    flow_id: context.flow_id,
    phase: context.phase,
    handler: self.class.to_s
  }
  Distribot.publish!(context.finished_queue, publish_args)
end

#runObject



45
46
47
48
49
# File 'lib/distribot/worker.rb', line 45

def run
  prepare_for_enumeration
  subscribe_to_task_queue
  self
end

#subscribe_to_task_queueObject



72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/distribot/worker.rb', line 72

def subscribe_to_task_queue
  logger.tagged("handler:#{self.class}") do
    subscribe_args = { reenqueue_on_failure: true, solo: true }
    pool = Concurrent::FixedThreadPool.new(500)
    Distribot.subscribe(self.class.task_queue, subscribe_args) do |task|
      pool.post do
        logger_tags = task.map { |k, v| [k, v].join(':') }
        logger.tagged(logger_tags) do
          handle_task_execution(task)
        end
      end
    end
  end
end