Class: Backnob::Server

Inherits:
Object
  • Object
show all
Includes:
DRb::DRbUndumped, Singleton
Defined in:
lib/backnob/server.rb

Overview

Backnob server.

Constant Summary collapse

OPTIONS =

Default options

{:listen => '127.0.0.1:6444'}
WORKER_KLASSES =

Array of worker classes

[]

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.method_missing(method, *args, &block) ⇒ Object

Pass any calls to class to instance



28
29
30
# File 'lib/backnob/server.rb', line 28

def self.method_missing(method, *args, &block) # :nodoc:
  self.instance.__send__(method, *args)
end

Instance Method Details

#add_file(file) ⇒ Object

Add a worker file or a directory of worker files



189
190
191
192
193
194
195
196
197
198
199
200
# File 'lib/backnob/server.rb', line 189

def add_file(file)
  @options[:workers] ||= []
  
  if File.exists?(file)
    @options[:workers].delete(file)
    @options[:workers] << file

    logger.debug "Added worker file #{file}"
  end
  
  @options[:workers]
end

#create_worker(name, options = {}) ⇒ Object

Create a new worker. Given a name it will find the first class that matches that name. Options hash is passed to the worker. A key is returned which can be used to retrieve the worker results.



143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/backnob/server.rb', line 143

def create_worker(name, options = {})
  load_workers
  
  klass = WORKER_KLASSES.detect do |w|
    w.name =~ /#{name}/i
  end
        
  if klass
    # Generate a key
    begin
      key = Digest::SHA1.hexdigest(rand.to_s + Time.now.to_i.to_s + klass.name)
    end until !get_worker(key)
                
    # If the class is marked as single then find an existing one if possible
    if klass.single && worker = @workers.detect{|w| w.klass == klass}
      worker.keys << key
      worker.object.enqueue(key, options)
      logger.debug "Adding work to worker #{klass.name} with key #{key}"
    else     
      logger.debug "Creating worker #{klass.name} with key #{key}"
      
      slave = Slave.new :object => klass.new(key, options)
      worker = AttrHash.new
      worker[:slave] = slave
      worker[:object] = slave.object
      worker[:klass] = klass
      worker[:keys] = [key]
      slave.object.add_observer(self)
      slave.object.start
      @workers << worker
    end

    key
  else
    raise Exception.new("Invalid worker class: #{name}")
  end
end

#loggerObject

Return a default logger for this process



135
136
137
# File 'lib/backnob/server.rb', line 135

def logger
  @logger ||= Logger.new($stdout)
end

#pingObject

Just return true. Used by client to test server is responding



124
125
126
# File 'lib/backnob/server.rb', line 124

def ping
  return true
end

#register(klass) ⇒ Object

Register a class as a worker



182
183
184
185
# File 'lib/backnob/server.rb', line 182

def register(klass)
  WORKER_KLASSES.delete(klass) rescue nil
  WORKER_KLASSES << klass
end

#results(key, hk = nil) ⇒ Object

Get results from a worker. Results are cached if possible



112
113
114
115
116
117
118
119
120
121
# File 'lib/backnob/server.rb', line 112

def results(key, hk = nil)
  results = @results[key]
  results ||= {}
  # if @results[key]
  #   results = @results[key]
  # else
  #   results = get_worker(key).object.results() rescue {}     
  # end
  (hk ? results[hk] : results)
end

#start(options = {}) ⇒ Object

Start the server with given options.

Options:

:path

Path to chdir to after starting

:workers

Array of worker files or directories

:listen

Address to listen on. Defaults to 127.0.0.1:6444



54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# File 'lib/backnob/server.rb', line 54

def start(options = {})    
  options.symbolize_keys!        
  
  # Sanitize the options
  @options = OPTIONS.merge(options)
  @options.extend(Backnob::Options)
  @options.sanitize!
  
  if @options[:path] && File.exists?(@options[:path])
    Dir.chdir(@options[:path])
  end
  
  # Add the workers directory
  add_file File.dirname(__FILE__) + '/workers'
  
  # Set default variables
  @workers = []
  @results = {}
  @rqueue = Queue.new

  m = self
  
  # Trap TERM and INT signals to run the
  # stop method
  Signal.trap("TERM") do
    m.stop
  end

  Signal.trap("INT") do
    m.stop
  end      
        
  DRb.start_service(@options[:listen], m)
  logger.info "Waiting on #{@options[:listen]}"
  
  # This thread now waits for exit, and also
  # caches results off the results queue.
  # It also cleans up workers that have quit.
  while DRb.thread && DRb.thread.alive? && !@stopping
    rqueue = @rqueue.pop        
    unless rqueue == :stop
      data = ::Marshal.load(rqueue)
      key = data[0]

      @results[key] = {} unless @results.has_key?(key)
      @results[key][data[1]] = data[2]        

      if data[1] == :error
        logger.debug "Error occurred in worker #{key}"
        logger.debug data[2].to_s
        logger.debug data[2].backtrace if data[2].backtrace
      end
    end
  end
end

#stopObject

Stop the server and all workers If receieved twice then stops even if workers are still running



35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/backnob/server.rb', line 35

def stop
  @stopping = true
  @rqueue << :stop
  
  logger.info "Sending stop to all active workers"
  @workers.each{|w| w.object.stop rescue nil }
  @workers.each{|w| w.slave.shutdown rescue nil }
  logger.info "Stopping DRb service"
  DRb.stop_service
  
  exit!
end

#update(data) ⇒ Object

Update method for observing workers. This recieved marshalled result data and puts it on the receive queue for caching



130
131
132
# File 'lib/backnob/server.rb', line 130

def update(data)
  @rqueue << data
end