Class: Backnob::Server

Inherits:
Object
  • Object
show all
Includes:
DRb::DRbUndumped, Singleton
Defined in:
lib/backnob/server.rb

Constant Summary collapse

OPTIONS =

Default options

{:listen => '127.0.0.1:6444'}
WORKER_KLASSES =

Array of worker classes

[]

Class Method Summary collapse

Instance Method Summary collapse

Class Method Details

.method_missing(method, *args, &block) ⇒ Object

Pass any calls to class to instance



27
28
29
# File 'lib/backnob/server.rb', line 27

def self.method_missing(method, *args, &block)
  self.instance.__send__(method, *args)
end

Instance Method Details

#add_file(file) ⇒ Object

Add a worker file



172
173
174
175
176
177
178
179
180
181
182
183
# File 'lib/backnob/server.rb', line 172

def add_file(file)
  @options[:workers] ||= []
  
  if File.exists?(file)
    @options[:workers].delete(file)
    @options[:workers] << file

    logger.debug "Added worker file #{file}"
  end
  
  @options[:workers]
end

#create_worker(name, options = {}) ⇒ Object

Create a new worker. Given a name it will find the first class that matches that name. Options hash is passed to the worker. A key is returned which can be used to retrieve the worker results.



144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/backnob/server.rb', line 144

def create_worker(name, options = {})
  load_workers
  
  klass = WORKER_KLASSES.detect do |w|
    w.name =~ /#{name}/i
  end
  
  if klass
    key = Digest::SHA1.hexdigest(Time.now.to_i.to_s + klass.name)

    logger.debug "Creating worker #{klass.name} with key #{key}"
    
    worker = Slave.new :object => klass.new(key, options)
    worker.object.add_observer(self)        
    worker.object.start
    @workers[key] = worker

    key
  end
end

#loggerObject

Return a default logger for this process



136
137
138
# File 'lib/backnob/server.rb', line 136

def logger
  @logger ||= Logger.new($stdout)
end

#pingObject

Just return true. Used by client to test server is responding



125
126
127
# File 'lib/backnob/server.rb', line 125

def ping
  return true
end

#register(klass) ⇒ Object

Register a class as a worker



166
167
168
169
# File 'lib/backnob/server.rb', line 166

def register(klass)
  WORKER_KLASSES.delete(klass) rescue nil
  WORKER_KLASSES << klass
end

#results(key, hk = nil) ⇒ Object

Get results from a worker. Results are cached if possible



114
115
116
117
118
119
120
121
122
# File 'lib/backnob/server.rb', line 114

def results(key, hk = nil)
  results = {}
  if @results[key]
    results = @results[key]
  else
    results = @workers[key].object.results() rescue {}      
  end
  (hk ? results[hk] : results)
end

#start(options = {}) ⇒ Object

Start the server



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
# File 'lib/backnob/server.rb', line 49

def start(options = {})    
  options.symbolize_keys!        
  
  # Sanitize the options
  @options = OPTIONS.merge(options)
  @options.extend(Backnob::Options)
  @options.sanitize!
  
  if @options[:path] && File.exists?(@options[:path])
    Dir.chdir(@options[:path])
  end
  
  # Add the workers directory
  add_file File.dirname(__FILE__) + '/workers'
  
  # Set default variables
  @workers = {}
  @results = {}
  @rqueue = Queue.new

  m = self
  
  # Trap TERM and INT signals to run the
  # stop method
  Signal.trap("TERM") do
    m.stop
  end

  Signal.trap("INT") do
    m.stop
  end      
        
  DRb.start_service(@options[:listen], m)
  logger.info "Waiting on #{@options[:listen]}"
  
  # This thread now waits for exit, and also
  # caches results off the results queue.
  # It also cleans up workers that have quit.
  while DRb.thread && DRb.thread.alive?
    sleep 0.25
    
    unless @rqueue.empty?
      # Unmarshal results
      data = ::Marshal.load(@rqueue.pop)
      key = data[0]
      
      @results[key] = {} unless @results.has_key?(key)
      @results[key][data[1]] = data[2]        
      
      if data[1] == :error
        logger.debug "Error occurred in worker #{key}"
        logger.debug data[2].to_s
        logger.debug data[2].backtrace if data[2].backtrace
      end
      
      if data[1] == :running && data[2] == false
        @workers[key].shutdown(:quiet => true)
        @workers.delete(key)
      end
    end
  end
end

#stopObject

Stop the server and all workers If receieved twice then stops even if workers are still running



34
35
36
37
38
39
40
41
42
43
44
45
46
# File 'lib/backnob/server.rb', line 34

def stop
  unless true #@stopping
    @stopping = true
    logger.info "Waiting for workers to exit"
    @workers.values.each{|w| w.wait}
  else
    logger.info "Forcing workers to exit"
    @workers.values.each{|w| w.shutdown}
  end
  
  logger.info "Stopping DRb service"
  DRb.stop_service
end

#update(data) ⇒ Object

Update method for observing workers. This recieved marshalled result data and puts it on the receive queue for caching



131
132
133
# File 'lib/backnob/server.rb', line 131

def update(data)
  @rqueue << data
end