Class: MicroQ::Manager::Default

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/micro_q/manager/default.rb

Overview

The default manager implementation. Wrapper for a Queue and a pool of Workers. At each time slice after start! was called, try to dequeue messages from the queue. Perform each message on the worker pool.

The pool of workers (more info):

https://github.com/celluloid/celluloid/wiki/Pools

The pool manages asynchronously assigning messages to available workers, handles exceptions by restarting the dead actors and is generally a beautiful abstraction on top of a group of linked actors/threads.

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#busyObject (readonly)

Returns the value of attribute busy.



23
24
25
# File 'lib/micro_q/manager/default.rb', line 23

def busy
  @busy
end

#currentObject (readonly)

Returns the value of attribute current.



23
24
25
# File 'lib/micro_q/manager/default.rb', line 23

def current
  @current
end

#queueObject (readonly)

Returns the value of attribute queue.



23
24
25
# File 'lib/micro_q/manager/default.rb', line 23

def queue
  @queue
end

#workersObject (readonly)

Returns the value of attribute workers.



23
24
25
# File 'lib/micro_q/manager/default.rb', line 23

def workers
  @workers
end

Class Method Details

.shutdown!Object



105
106
107
# File 'lib/micro_q/manager/default.rb', line 105

def self.shutdown!
  @shutdown = true
end

.shutdown?Boolean

Returns:

  • (Boolean)


101
102
103
# File 'lib/micro_q/manager/default.rb', line 101

def self.shutdown?
  !!@shutdown
end

Instance Method Details

#build_missing_workersObject

Don’t shrink the pool if the config changes



82
83
84
85
86
87
88
89
90
91
# File 'lib/micro_q/manager/default.rb', line 82

def build_missing_workers
  return if MicroQ.queue_only?

  workers.select!(&:alive?)
  busy.select!(&:alive?)

  missing_worker_count.times do
    workers << MicroQ.config.worker.new_link(current_actor)
  end
end

#kill_allObject



97
98
99
# File 'lib/micro_q/manager/default.rb', line 97

def kill_all
  (workers + busy).each {|w| w.terminate if w.alive? }
end

#missing_worker_countObject



93
94
95
# File 'lib/micro_q/manager/default.rb', line 93

def missing_worker_count
  [MicroQ.config.workers - (workers.size + busy.size), 0].max
end

#reinitialize(*args) ⇒ Object Also known as: initialize

Handle init/death of the Queue or the Worker pool When a worker dies the args are (#<Actor …>, #<Exception>)



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/micro_q/manager/default.rb', line 60

def reinitialize(*args)
  kill_all and return if self.class.shutdown?

  unless @queue && queue.alive?
    @queue = MicroQ.config.queue.new_link
  end

  @busy ||= []
  @workers ||= []
  @current ||= {}

  if args.any?
    message = current.delete(args.first)
    queue.finished!(message) if queue.respond_to?(:finished)
  end

  build_missing_workers
end

#startObject



25
26
27
28
29
30
31
32
33
34
35
36
37
# File 'lib/micro_q/manager/default.rb', line 25

def start
  return if MicroQ.queue_only?

  count = workers.size

  if (messages = queue.dequeue(count)).any?
    messages.each do |message|
      work_on(message)
    end
  end

  after(2) { start }
end

#work_done(worker) ⇒ Object



39
40
41
42
43
44
45
# File 'lib/micro_q/manager/default.rb', line 39

def work_done(worker)
  message = current.delete(worker)
  queue.finished!(message) if queue.respond_to?(:finished)

  busy.delete(worker)
  workers.push(worker)
end

#work_on(message) ⇒ Object



47
48
49
50
51
52
53
54
# File 'lib/micro_q/manager/default.rb', line 47

def work_on(message)
  worker = workers.pop
  busy << worker

  current[worker] = message

  worker.perform!(message)
end