Class: ActiveMessaging::ThreadedPoller

Inherits:
Object
  • Object
show all
Includes:
Celluloid
Defined in:
lib/activemessaging/threaded_poller.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection = 'default', configuration = {}) ⇒ ThreadedPoller

connection is a string, name of the connection from broker.yml to use for this threaded poller instance

configuration is a list of hashes each has describes a group of worker threads for each group, define what priorities those workers will process

[
  {
    :pool_size  => 1       # number of workers of this type
    :priorities => [1,2,3] # what message priorities this thread will process
  }
]


30
31
32
33
34
35
# File 'lib/activemessaging/threaded_poller.rb', line 30

def initialize(connection='default', configuration={})
  # default config is a pool size of 3 worker threads
  self.configuration = configuration || [{:pool_size => 3}]
  self.connection = connection
  self.pause = 1
end

Instance Attribute Details

#busyObject

Returns the value of attribute busy.



15
16
17
# File 'lib/activemessaging/threaded_poller.rb', line 15

def busy
  @busy
end

#configurationObject

Returns the value of attribute configuration.



15
16
17
# File 'lib/activemessaging/threaded_poller.rb', line 15

def configuration
  @configuration
end

#connectionObject

Returns the value of attribute connection.



15
16
17
# File 'lib/activemessaging/threaded_poller.rb', line 15

def connection
  @connection
end

#pauseObject

Returns the value of attribute pause.



15
16
17
# File 'lib/activemessaging/threaded_poller.rb', line 15

def pause
  @pause
end

#receiverObject

Returns the value of attribute receiver.



15
16
17
# File 'lib/activemessaging/threaded_poller.rb', line 15

def receiver
  @receiver
end

#runningObject

Returns the value of attribute running.



15
16
17
# File 'lib/activemessaging/threaded_poller.rb', line 15

def running
  @running
end

#workersObject

Returns the value of attribute workers.



15
16
17
# File 'lib/activemessaging/threaded_poller.rb', line 15

def workers
  @workers
end

Instance Method Details

#died(worker, reason) ⇒ Object



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# File 'lib/activemessaging/threaded_poller.rb', line 118

def died(worker, reason)
  busy.delete(worker)

  if running
    logger.info "uh oh, #{worker.inspect} died because of #{reason.class}"
    worker = Worker.new_link(current_actor)
    workers << worker
    receive(worker)
  else
    logger.info "check to see if busy is empty: #{busy.inspect}"
    if busy.empty?
      logger.info "all died: signal stopped"
      after(0){ signal(:shutdown) }
    end
  end
end

#dispatch(message, worker) ⇒ Object



97
98
99
100
101
# File 'lib/activemessaging/threaded_poller.rb', line 97

def dispatch(message, worker)
  workers.delete(worker)
  busy << worker
  worker.execute!(message)
end

#executed(worker) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/activemessaging/threaded_poller.rb', line 103

def executed(worker)
  busy.delete(worker)

  if running
    workers << worker
    receive(worker)
  else
    worker.terminate if worker.alive?
    if busy.empty?
      logger.info "all executed: signal stopped"
      after(0) { signal(:shutdown) }
    end
  end
end

#inspectObject



139
140
141
# File 'lib/activemessaging/threaded_poller.rb', line 139

def inspect
  "#<ThreadedPoller #{to_s}>"
end

#log_statusObject

recursive method, uses celluloid ‘after’ to keep calling



87
88
89
90
91
# File 'lib/activemessaging/threaded_poller.rb', line 87

def log_status
  return unless logger.debug?
  logger.debug("ActiveMessaging::ThreadedPoller: conn:#{connection}, #{workers.count}, #{busy.count}, #{running}")
  after(10){ log_status }
end

#loggerObject



147
# File 'lib/activemessaging/threaded_poller.rb', line 147

def logger; ActiveMessaging.logger; end

#receive(worker) ⇒ Object



93
94
95
# File 'lib/activemessaging/threaded_poller.rb', line 93

def receive(worker)
  receiver.receive!(worker) if (receiver && running && worker)
end

#startObject



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/activemessaging/threaded_poller.rb', line 37

def start
  logger.info "ActiveMessaging::ThreadedPoller start"

  # these are workers ready to use
  self.workers = []

  # these are workers already working
  self.busy = []

  # this indicates if we are running or not, helps threads to stop gracefully
  self.running = true

  # subscribe will create the connections based on subscriptions in processsors
  # (you can't find or use the connection until it is created by calling this)
  ActiveMessaging::Gateway.subscribe

  # create a message receiver actor, ony need one, using connection
  receiver_connection = ActiveMessaging::Gateway.connection(connection)
  self.receiver = MessageReceiver.new(current_actor, receiver_connection, pause)

  # start the workers based on the config
  configuration.each do |c|
    (c[:pool_size] || 1).times{ self.workers << Worker.new_link(current_actor, c) }
  end

  # once all workers are created, start them up
  self.workers.each{|worker| receive(worker)}

  # in debug level, log info about workers every 10 seconds
  log_status
end

#stopObject



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/activemessaging/threaded_poller.rb', line 69

def stop
  logger.info "ActiveMessaging::ThreadedPoller stop"
  # indicates to all busy workers not to pick up another messages, but does not interrupt
  # also indicates to the message receiver to stop getting more messages
  self.running = false

  # tell each waiting worker to shut down.  Running ones will be allowed to finish
  receiver.terminate! if receiver.alive?
  logger.info "ActiveMessaging::ThreadedPoller receiver terminated"

  workers.each { |w| w.terminate! if w.alive? }
  logger.info "ActiveMessaging::ThreadedPoller workers terminated"


  after(0) { signal(:shutdown) } if stopped?
end

#stopped?Boolean

Returns:

  • (Boolean)


135
136
137
# File 'lib/activemessaging/threaded_poller.rb', line 135

def stopped?
  (!running && busy.empty?)
end

#to_sObject



143
144
145
# File 'lib/activemessaging/threaded_poller.rb', line 143

def to_s
  @str ||= "#{Process.pid}-#{Thread.current.object_id}:#{self.object_id}"
end