Class: Workling::Invokers::ThreadedPoller

Inherits:
Base
  • Object
show all
Defined in:
lib/workling/invokers/threaded_poller.rb

Instance Attribute Summary

Attributes inherited from Base

#reset_time, #sleep_time

Instance Method Summary collapse

Methods inherited from Base

#logger, #run

Constructor Details

#initialize(routing, client_class) ⇒ ThreadedPoller

Returns a new instance of ThreadedPoller.



12
13
14
15
16
17
18
19
20
# File 'lib/workling/invokers/threaded_poller.rb', line 12

def initialize(routing, client_class)
  super

  ThreadedPoller.sleep_time = Workling.config[:sleep_time] || 2
  ThreadedPoller.reset_time = Workling.config[:reset_time] || 30

  @workers = ThreadGroup.new
  @mutex = Mutex.new
end

Instance Method Details

#clazz_listen(clazz) ⇒ Object

Listen for one worker class



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/workling/invokers/threaded_poller.rb', line 63

def clazz_listen(clazz)
  logger.debug("Listener thread #{clazz.name} started")

  # Read thread configuration if available
  if Workling.config.has_key?(:listeners)
    if Workling.config[:listeners].has_key?(clazz.to_s)
      config = Workling.config[:listeners][clazz.to_s].symbolize_keys
      thread_sleep_time = config[:sleep_time] if config.has_key?(:sleep_time)
      Thread.current.priority = config[:priority] if config.has_key?(:priority)
    end
  end

  thread_sleep_time ||= self.class.sleep_time

  # Setup connection to client (one per thread)
  connection = @client_class.new
  connection.connect
  logger.info("** Starting client #{ connection.class } for #{clazz.name} queue")

  # Start dispatching those messages
  while (!Thread.current[:shutdown]) do
    begin

      # Thanks for this Brent! 
      #
      #     ...Just a heads up, due to how rails’ MySQL adapter handles this  
      #     call ‘ActiveRecord::Base.connection.active?’, you’ll need 
      #     to wrap the code that checks for a connection in in a mutex.
      #
      #     ....I noticed this while working with a multi-core machine that 
      #     was spawning multiple workling threads. Some of my workling 
      #     threads would hit serious issues at this block of code without 
      #     the mutex.            
      #
      if defined?(ActiveRecord::Base)
        @mutex.synchronize do 
          unless ActiveRecord::Base.connection.active?  # Keep MySQL connection alive
            unless ActiveRecord::Base.connection.reconnect!
              logger.fatal("Failed - Database not available!")
              break
            end
          end
        end
      end

      # Dispatch and process the messages
      n = dispatch!(connection, clazz)
      logger.debug("Listener thread #{clazz.name} processed #{n.to_s} queue items") if n > 0
      sleep(thread_sleep_time) unless n > 0

    # If there is a memcache error, hang for a bit to give it a chance to fire up again
    # and reset the connection.
    rescue Workling::WorklingConnectionError
      logger.warn("Listener thread #{clazz.name} failed to connect. Resetting connection.")
      sleep(self.class.reset_time)
      connection.reset
    end
  end

  logger.debug("Listener thread #{clazz.name} ended")
end

#dispatch!(connection, clazz) ⇒ Object

Dispatcher for one worker class. Will throw MemCacheError if unable to connect. Returns the number of worker methods called



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# File 'lib/workling/invokers/threaded_poller.rb', line 127

def dispatch!(connection, clazz)
  n = 0
  for queue in @routing.queue_names_routing_class(clazz)
    begin
      result = connection.retrieve(queue)
      if result
        n += 1
        handler = @routing[queue]
        method_name = @routing.method_name(queue)
        logger.debug("Calling #{handler.class.to_s}\##{method_name}(#{result.inspect})")
        handler.dispatch_to_worker_method(method_name, result)
      end
    rescue Workling::WorklingError => e
      logger.error("FAILED to connect with queue #{ queue }: #{ e } }")
      raise e
    end
  end

  return n
end

#listenObject



22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# File 'lib/workling/invokers/threaded_poller.rb', line 22

def listen
  # Create a thread for each worker.
  Workling::Discovery.discovered_workers.each do |clazz|
    logger.debug("Discovered listener #{clazz}")
    @workers.add(Thread.new(clazz) { |c| clazz_listen(c) })
  end

  # Wait for all workers to complete
  @workers.list.each { |t| t.join }

  logger.debug("Reaped listener threads. ")

  # Clean up all the connections.
  if defined?(ActiveRecord::Base)
    ActiveRecord::Base.verify_active_connections!
  end

  logger.debug("Cleaned up connection: out!")
end

#started?Boolean

Check if all Worker threads have been started.

Returns:

  • (Boolean)


43
44
45
46
# File 'lib/workling/invokers/threaded_poller.rb', line 43

def started?
  logger.debug("checking if started... list size is #{ worker_threads }")
  Workling::Discovery.discovered_workers.size == worker_threads
end

#stopObject

Gracefully stop processing



54
55
56
57
58
59
60
# File 'lib/workling/invokers/threaded_poller.rb', line 54

def stop
  logger.info("stopping threaded poller...")
  sleep 1 until started? # give it a chance to start up before shutting down. 
  logger.info("Giving Listener Threads a chance to shut down. This may take a while... ")
  @workers.list.each { |w| w[:shutdown] = true }
  logger.info("Listener threads were shut down.  ")
end

#worker_threadsObject

number of worker threads running



49
50
51
# File 'lib/workling/invokers/threaded_poller.rb', line 49

def worker_threads
  @workers.list.size
end