Class: TaskTempest::Dispatcher

Inherits:
Object
  • Object
show all
Defined in:
lib/task_tempest/dispatcher.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(options) ⇒ Dispatcher

Returns a new instance of Dispatcher.



5
6
7
8
# File 'lib/task_tempest/dispatcher.rb', line 5

def initialize(options)
  options.each{ |k, v| instance_variable_set("@#{k}", v) }
  start
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



3
4
5
# File 'lib/task_tempest/dispatcher.rb', line 3

def logger
  @logger
end

Instance Method Details

#alive?Boolean

Returns:

  • (Boolean)


19
20
21
# File 'lib/task_tempest/dispatcher.rb', line 19

def alive?
  @thread and @thread.alive?
end

#consumeObject



39
40
41
42
43
44
45
46
47
48
# File 'lib/task_tempest/dispatcher.rb', line 39

def consume
  @message = @queue.dequeue
  if @message
    true
  else
    logger.debug "queue empty, sleeping for #{@no_message_sleep}"
    sleep(@no_message_sleep)
    false
  end
end

#dead?Boolean

Returns:

  • (Boolean)


23
24
25
# File 'lib/task_tempest/dispatcher.rb', line 23

def dead?
  not alive?
end

#dispatchObject



50
51
52
53
54
55
56
# File 'lib/task_tempest/dispatcher.rb', line 50

def dispatch
  task_id, task_class_name, *task_args = @message
  task_class = TaskTempest::Task.const_get(task_class_name)
  task = task_class.new(*task_args).init(:id => task_id, :logger => @task_logger)
  task.execution = @storm.execute(task){ task.run }
  logger.info task.format_log("started")
end

#exceptionObject



27
28
29
# File 'lib/task_tempest/dispatcher.rb', line 27

def exception
  dead? and @thread.value
end

#runObject



31
32
33
# File 'lib/task_tempest/dispatcher.rb', line 31

def run
  run_loop while true
end

#run_loopObject



35
36
37
# File 'lib/task_tempest/dispatcher.rb', line 35

def run_loop
  consume and dispatch
end

#shutdownObject



58
59
60
61
# File 'lib/task_tempest/dispatcher.rb', line 58

def shutdown
  @thread and @thread.kill
  @thread.join
end

#startObject Also known as: restart



10
11
12
13
14
15
# File 'lib/task_tempest/dispatcher.rb', line 10

def start
  if dead?
    @queue = @queue_factory.call
    @thread = Thread.new{ run }
  end
end