Class: BlueColr

Inherits:
Object
  • Object
show all
Includes:
GraphOutput
Defined in:
lib/blue_colr.rb,
lib/blue_colr/graph_output.rb

Overview

This class provides a DSL for enqueuing processes, at the same time describing their mutual dependance.

Defined Under Namespace

Modules: GraphOutput

Constant Summary collapse

DEFAULT_PENDING_STATE =

If no alternative statemap is provided, all newly launched processes will have this state by default.

'pending'
PREPARING_STATE =

Used internally.

'preparing'
DEFAULT_STATEMAP =

Default state transitions with simple state setup (‘PENDING => RUNNING => OK or ERROR’)

{
  'on_pending' => {
    DEFAULT_PENDING_STATE => [
      ['running', ['ok', 'skipped']]
    ]
  },
  'on_running' => {
    'running' => {
      'error' => 'error',
      'ok' => 'ok'
    }
  },
  'on_restart' => {
    'error' => 'pending',
    'ok' => 'pending'
  }
}

Class Attribute Summary collapse

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Methods included from GraphOutput

#graph_enqueue, included

Class Attribute Details

.confObject

Configuration hash read from yaml config file



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/blue_colr.rb', line 45

def conf
  unless @conf
    if @db_url || @db # skip loading config if db set explicitly
      @conf = {}
    else
      parse_command_line unless @args

      raise "No configuration file defined (-c <config>)." if @args["config"].nil?
      raise "Couldn't read #{@args["config"]} file." unless @args['config'] && @conf = YAML::load(File.new(@args["config"]).read)
      # setting default options that should be written along with all the records to process_items
      if @conf['default_options']
        @conf['default_options'].each do |k,v|
          default_options.send("#{k}=", v)
        end
      end

      if @args['params']
        @args['params'].each do |k, v|
          default_options.send("#{k}=", v)
        end
      end
    end
  end
  @conf
end

.dbObject

Sequel DB connection instance



80
81
82
83
84
85
# File 'lib/blue_colr.rb', line 80

def db
  unless @db # not connected
    @db = Sequel.connect(self.db_uri, :logger => self.log)
  end
  @db
end

.db_uriObject

Sequel DB URI connection string



72
73
74
75
76
77
# File 'lib/blue_colr.rb', line 72

def db_uri
  unless @db_uri # get the config from command line
    @db_uri = self.conf['db_url']
  end
  @db_uri
end

.environmentObject

Returns the value of attribute environment.



37
38
39
# File 'lib/blue_colr.rb', line 37

def environment
  @environment
end

.logObject



40
41
42
# File 'lib/blue_colr.rb', line 40

def log
  @log ||= Logger.new('process_daemon')
end

.statemapObject

Map of states that processes pass through (Pending -> Running -> Ok / Error)



99
100
101
# File 'lib/blue_colr.rb', line 99

def statemap
  @statemap ||= conf['statemap'] || DEFAULT_STATEMAP
end

Instance Attribute Details

#all_idsObject (readonly)

Use to access the list of ids of all processes that were enqueued



204
205
206
# File 'lib/blue_colr.rb', line 204

def all_ids
  @all_ids
end

#resultObject (readonly)

Returns the value of attribute result.



205
206
207
# File 'lib/blue_colr.rb', line 205

def result
  @result
end

Class Method Details

.default_optionsObject

Default options to use when launching a process - every field maps to a column in process_items table



89
90
91
# File 'lib/blue_colr.rb', line 89

def default_options
  @default_options ||= OpenStruct.new
end

.get_error_statesObject

Get all possible error states



192
193
194
# File 'lib/blue_colr.rb', line 192

def get_error_states
  self.statemap['on_running'].map{|_, new_states| new_states['error']}
end

.get_ok_statesObject

Get all possible ok states



197
198
199
# File 'lib/blue_colr.rb', line 197

def get_ok_states
  self.statemap['on_running'].map{|_, new_states| new_states['ok']}
end

.get_pending_statesObject

Get all possible pending states



187
188
189
# File 'lib/blue_colr.rb', line 187

def get_pending_states
  self.statemap['on_pending'].map{|state, _| state}
end

.launch(&block) ⇒ Object

Usually the root method for launcing a set of tasks.



114
115
116
117
118
119
120
# File 'lib/blue_colr.rb', line 114

def launch &block
  worker = self.new
  db.transaction do
    worker.instance_eval &block
  end
  worker
end

.optionsObject

Local hash used to store misc runtime options



94
95
96
# File 'lib/blue_colr.rb', line 94

def options
  @options ||= OpenStruct.new
end

.parse_command_line(&block) ⇒ Object

Parse command line arguments. You should call it explicitly if you need to submit some additional custom parameters. Otherwise it will be called automatically in order to get parameters needed for running, such as database connection string.



131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# File 'lib/blue_colr.rb', line 131

def parse_command_line &block
  data = {}

  OptionParser.new do |opts|
    opts.banner = "Usage: #{File.basename($0)} [options]"

    opts.on("-c CONFIG", "--conf CONFIG", "YAML config file") do |config|
      data["config"] = config
    end

    opts.on("-p PARAMS", "--params PARAMS", "Additional default options - key: value as JSON string, override values from config file") do |params|
      data["params"] = JSON.parse(params)
    end

    # process custom args, if given
    block.call(opts) if block_given?

    opts.on_tail('-h', '--help', 'display this help and exit') do
      puts opts
      exit
#          return nil
    end

#        begin
      opts.parse(ARGV)
#        rescue OptionParser::InvalidOption
#          # do nothing
#        end

  end

  @args = data
end

.run(&block) ⇒ Object

Run a set of tasks (launch it and wait until the last one finishes). exit with returned exitcode.



123
124
125
126
# File 'lib/blue_colr.rb', line 123

def run &block
  worker = launch &block
  exit worker.wait
end

.state_from_pending(current_state, parent_states) ⇒ Object

Get the next state from pending, given current state and state of all “parent” processes



169
170
171
172
173
174
# File 'lib/blue_colr.rb', line 169

def state_from_pending current_state, parent_states
  new_state, _ = self.statemap['on_pending'][current_state].find { |_, required_parent_states|
    (parent_states - required_parent_states).empty?
  }
  new_state
end

.state_from_running(current_state, ok) ⇒ Object

Get the next state from running, given current state and whether the command has finished successfully



177
178
179
# File 'lib/blue_colr.rb', line 177

def state_from_running current_state, ok
  self.statemap['on_running'][current_state][ok ? 'ok' : 'error']
end

.state_on_restart(current_state) ⇒ Object

Get the next state to get upon restart, given the current state



182
183
184
# File 'lib/blue_colr.rb', line 182

def state_on_restart current_state
  self.statemap['on_restart'][current_state]
end

Instance Method Details

#enqueue(cmd, waitfor = [], opts = {}) ⇒ Object



220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
# File 'lib/blue_colr.rb', line 220

def enqueue cmd, waitfor = [], opts = {}
  id = nil
  opts = {status: DEFAULT_PENDING_STATE}.merge(opts)
  def_opts = self.class.default_options.send(:table) # convert from OpenStruct to Hash
  # rejecting fields that do not have corresponding column in the table:
  fields = def_opts.merge(opts).select{|k,_| db[:process_items].columns.member? k}
  id = db[:process_items].insert(fields.merge(:status => PREPARING_STATE, :cmd => cmd, :queued_at => Time.now))
  waitfor.each do |wid|
    db[:process_item_dependencies].insert(:process_item_id => id, :depends_on_id => wid)
  end
  db[:process_items].filter(:id => id).update(:status => opts[:status])
#    id = TaskGroup.counter
  log.info "enqueueing #{id}: #{cmd}, waiting for #{waitfor.inspect}"
  # remember id
  @all_ids << id
  id
end

#parallel(&block) ⇒ Object

All processes enqueued within the given block should be executed in parallel (not waiting for each other to finish).



216
217
218
# File 'lib/blue_colr.rb', line 216

def parallel &block
  exec :parallel, &block
end

#run(cmd, opts = {}) ⇒ Object

Enqueues a single command cmd.

Parameters

cmd

A string containing the command that should be executed.

options

A set of optional parameters which override default fields associated with the given command (e.g. here you can specify different :environment that the command should be launched in, optional :description, or whatever you decide to store along the command).



247
248
249
250
251
252
253
254
255
256
# File 'lib/blue_colr.rb', line 247

def run cmd, opts = {}
  id = enqueue cmd, @waitfor, opts
  if @type == :sequential
    @waitfor = [id]
    @result = [id]
  else
    @result << id
  end
  @result
end

#sequential(&block) ⇒ Object

All processes enqueued within the given block should be executed sequentially, i.e. one after another.



210
211
212
# File 'lib/blue_colr.rb', line 210

def sequential &block
  exec :sequential, &block
end

#waitObject

Waits for all enqueued processes to finish. The default behaviour for BlueColr is to enqueue commands and exit. If for any reason you need to wait for the commands to finish execution, you can call this method which will wait until all enqueued processes are finished (either with Ok or error state).



261
262
263
264
265
266
267
268
269
270
# File 'lib/blue_colr.rb', line 261

def wait
  log.info 'Waiting for all processes to finish'
  loop do
    failed = db[:process_items].filter(:id => @all_ids, :status => BlueColr.get_error_states).first
    return failed[:exit_code] if failed
    not_ok_count = db[:process_items].filter(:id => @all_ids).exclude(:status => BlueColr.get_ok_states).count
    return 0 if not_ok_count == 0 # all ok, finish
    sleep 10
  end
end