Class: Pione::RuleEngine::RuleApplication

Inherits:
SimpleDelegator
  • Object
show all
Defined in:
lib/pione/rule-engine/flow-handler.rb

Instance Method Summary collapse

Constructor Details

#initialize(handler) ⇒ RuleApplication

Returns a new instance of RuleApplication.



171
172
173
174
175
# File 'lib/pione/rule-engine/flow-handler.rb', line 171

def initialize(handler)
  super(handler)
  @data_finder = DataFinder.new(tuple_space_server, domain_id)
  @finished = [] # finished tuple cache
end

Instance Method Details

#apply(rules) ⇒ Object

Apply input data to rules.



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# File 'lib/pione/rule-engine/flow-handler.rb', line 178

def apply(rules)
  # start message
  user_message_begin("Rule Application: %s" % digest, 1)

  # with profile
  Util::Profiler.profile(Util::RuleApplicationProfileReport.new(digest)) do
    # rule application loop
    while tasks = find_tasks(rules) do
      distribute_tasks(tasks)
    end
  end

  # end message
  user_message_end("Rule Application: %s" % digest, 1)
end

#check_updatability(task) ⇒ Object

Check updatability of the task and get update order.



310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
# File 'lib/pione/rule-engine/flow-handler.rb', line 310

def check_updatability(task)
  # read all tuples of data-null
  data_null_tuples = read_all(TupleSpace::DataNullTuple.new(domain: task.domain_id))

  res = []

  f = lambda do |task_env, outputs|
    # make parameter set for the task
    table = Hash.new

    if val_i = task_env.variable_get!(Lang::Variable.new("O"))
      table["OUTPUT"] = Lang::Variable.new("O")
      table["O"] = val_i
    end

    task_param_set = task.param_set.set(table: task.param_set.table.merge(table))

    # check update criterias
    order = UpdateCriteria.order(task_env, task.rule_condition, task.inputs, outputs, data_null_tuples)
    res << [order, task_env, task_param_set]
  end

  # find output data combination
  @data_finder.find(:output, task.rule_condition.outputs, task.env, &f)
  f.call(task.env, []) if res.empty?

  # evaluate the result
  groups = res.group_by {|(order, _, _)| order}
  if f = groups[:force] or f = groups[:weak]
    order, env, param_set = f.first

    # setup output variables
    var_o = Lang::Variable.new("O")
    task.env.variable_set(Lang::Variable.new("OUTPUT"), var_o)
    kseq = find_output_variables(task, Lang::KeyedSequence.new)
    task.env.variable_set(var_o, kseq)
    param_set = param_set.set(table: param_set.table.merge({"O" => kseq}))

    return task.set(order: order, env: env, param_set: param_set)
  else
    return nil
  end
end

#distribute_tasks(tasks) ⇒ Object

Distribute tasks.



368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
# File 'lib/pione/rule-engine/flow-handler.rb', line 368

def distribute_tasks(tasks)
  # log and message
  process_log(make_task_process_record.merge(transition: "suspend"))
  process_log(make_rule_process_record.merge(transition: "suspend"))
  user_message_begin("Distribution: %s" % digest, 2)

  # distribute tasks
  tasks.each do |task|
    tuple = task.make_tuple(domain_id)

    # publish tasks
    if need_to_publish_task?(task, tuple)
      # clear finished tuple and data tuples from the domain
      take!(TupleSpace::FinishedTuple.new(domain: task.domain_id))
      take_all!(TupleSpace::DataTuple.new(domain: task.domain_id))

      # copy input data from this domain to the task domain
      task.inputs.flatten.each {|input| copy_data_into_domain(input, task.domain_id)}

      # write the task
      write(tuple)

      # log and message
      process_log(task.make_task_process_record.merge(transition: "schedule"))
      user_message(">>> %s".color(:yellow) % task.digest, 3, "", :blue)
    else
      # cancel the task
      Log::Debug.rule_engine "task %s canceled at %s" % [task.digest, digest]
    end
  end

  # wait an end of distributed tasks
  wait_task_completion(tasks)

  # turn foreground if the task is background
  unless read!(TupleSpace::ForegroundTuple.new(domain_id, digest))
    write(TupleSpace::ForegroundTuple.new(domain_id, digest))
  end

  # log and message
  process_log(make_rule_process_record.merge(transition: "resume"))
  process_log(make_task_process_record.merge(transition: "resume"))
  user_message_end("Distribution: %s" % digest, 2)
end

#find_applicable_rules(rules) ⇒ Object

Find applicable rules. The criterion of applicable rule is that the rule satisfies ticket conditions or not.



208
209
210
211
212
213
214
215
# File 'lib/pione/rule-engine/flow-handler.rb', line 208

def find_applicable_rules(rules)
  # select rules which ticktes exist in this domain
  rules.select do |rule|
    rule.input_tickets.pieces.all? do |ticket|
      read!(TupleSpace::TicketTuple.new(domain_id, ticket.name))
    end
  end
end

#find_output_variables(task, kseq) ⇒ Object



354
355
356
357
358
359
360
361
362
363
364
365
# File 'lib/pione/rule-engine/flow-handler.rb', line 354

def find_output_variables(task, kseq)
  _kseq = kseq
  task.rule_condition.outputs.each_with_index do |condition, i|
    begin
      data = condition.eval(task.env)
      _kseq = _kseq.put(Lang::IntegerSequence.of(i+1), data)
    rescue Lang::UnboundError
      next
    end
  end
  return _kseq
end

#find_tasks(rules) ⇒ Object

Find applicable and updatable rule applications.



195
196
197
198
199
200
201
202
203
204
# File 'lib/pione/rule-engine/flow-handler.rb', line 195

def find_tasks(rules)
  # select applicable rules
  applicable_rules = find_applicable_rules(rules)

  # make task
  tasks = make_tasks(applicable_rules)

  # be careful that returns nil when tasks are empty
  tasks.empty? ? nil : tasks
end

#find_tasks_by_rule_condition(env, rule, rule_definition, rule_condition, param_set) ⇒ Object

Handle parameter distribution. Rule parameters with each modifier are distributed tasks by each element.



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
# File 'lib/pione/rule-engine/flow-handler.rb', line 267

def find_tasks_by_rule_condition(env, rule, rule_definition, rule_condition, param_set)
  tasks = []

  # find input data combinations
  @data_finder.find(:input, rule_condition.inputs, env) do |task_env, inputs|
    # make parameter set for the task
    table = Hash.new

    if val_i = task_env.variable_get!(Lang::Variable.new("I"))
      table["INPUT"] = Lang::Variable.new(name: "I", package_id: rule.package_id)
      table["I"] = val_i
    end

    if val_star = task_env.variable_get!(Lang::Variable.new("*"))
      table["*"] = val_star
    end

    task_param_set = param_set.set(table: param_set.table.merge(table))

    # check constraint conditions
    next unless rule_condition.constraints.all? do |constraint|
      res = constraint.eval(task_env)
      if res.is_a?(Lang::BooleanSequence)
        res.value
      else
        raise Lang::StructuralError.new(Lang::BooleanSequence, constraint.pos)
      end
    end

    # make task
    domain_id = Util::DomainID.generate(rule.package_id, rule.name, inputs, task_param_set)
    task = Task.new(task_env, domain_id, rule, rule_definition, rule_condition, inputs, task_param_set)

    # check updatability
    if _task = check_updatability(task)
      tasks << _task
    end
  end

  return tasks
end

#import_outputs_of_task(task, finished) ⇒ Object

Import finished tuple’s outputs from the domain.



459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
# File 'lib/pione/rule-engine/flow-handler.rb', line 459

def import_outputs_of_task(task, finished)
  finished.outputs.each_with_index do |output, i|
    data_expr = task.rule_condition.outputs[i].eval(task.env)
    case data_expr.operation
    when :write
      if output.kind_of?(Array)
        output.each {|o| copy_data_into_domain(o, domain_id)}
      else
        copy_data_into_domain(output, domain_id)
      end
    when :remove
      if output.kind_of?(Array)
        output.each {|o| remove_data_from_domain(o, domain_id)}
      else
        remove_data_from_domain(output, domain_id)
      end
    when :touch
      if output.kind_of?(Array)
        output.each {|o| touch_data_in_domain(o, domain_id)}
      else
        touch_data_in_domain(output, domain_id)
      end
    end
  end
end

#lift_touch_tuple(task) ⇒ Object

Lift effects of touch operations from the task domain to this domain.



486
487
488
489
490
491
492
493
494
495
496
# File 'lib/pione/rule-engine/flow-handler.rb', line 486

def lift_touch_tuple(task)
  read_all(TupleSpace::TouchTuple.new(domain: task.domain_id)).each do |touch|
    if target = read!(TupleSpace::DataTuple.new(name: touch.name, domain: domain_id))
      # update time of data tuple
      write(target.tap {|x| x.time = touch.time}) unless target.time > touch.time

      # lift touch tuple to upper domain
      write(touch.tap{|x| x.domain = domain_id})
    end
  end
end

#make_tasks(rules) ⇒ Object

Make tasks from rules.



218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/pione/rule-engine/flow-handler.rb', line 218

def make_tasks(rules)
  rules.each_with_object([]) do |rule, tasks|
    # set handler's package id if rule's package id is implicit
    rule = rule.set(package_id: package_id) unless rule.package_id

    # get rule definition
    rule_definition = env.rule_get(rule)

    # handle parameter sequence
    pieces = rule.param_sets.pieces
    if not(pieces.empty?)
      pieces.each do |param_set|
        ### merge default parameter values ####
        # setup task's environment by parameter set
        _env = plain_env.layer.merge_param_set(param_set)
        _env.set(current_package_id: rule.package_id || env.current_package_id)

        # get task's condition
        rule_condition = rule_definition.rule_condition_context.eval(_env)

        # merge default values
        _param_set = param_set.merge_default_values(rule_condition)

        # handle parameter distribution
        _param_set.eval(_env).expand do |expanded_param_set|
          # rebuild environment by expanded param set
          _env = plain_env.layer.merge_param_set(expanded_param_set)
          _env.set(current_package_id: rule.package_id || env.current_package_id)

          # get task's condition
          rule_condition = rule_definition.rule_condition_context.eval(_env)

          tasks.concat find_tasks_by_rule_condition(_env, rule, rule_definition, rule_condition, expanded_param_set).uniq
        end
      end
    else
      _env = plain_env.layer
      # get task's condition
      rule_condition = rule_definition.rule_condition_context.eval(_env)
      this_tasks = find_tasks_by_rule_condition(
        _env, rule, rule_definition, rule_condition, Lang::ParameterSet.new
      ).uniq
      tasks.concat(this_tasks)
    end
  end
end

#need_to_publish_task?(task, tuple) ⇒ Boolean

Return true if we need to publish the task.

Returns:

  • (Boolean)


414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
# File 'lib/pione/rule-engine/flow-handler.rb', line 414

def need_to_publish_task?(task, tuple)
  # reuse task finished result if order is weak update
  if task.order == :weak
    template = TupleSpace::FinishedTuple.new(domain: task.domain_id, status: :succeeded)
    if @finished.include?(template)
      return false
    end
    if finished = read!(template)
      @finished << finished
      return false
    end
  end

  # the task exists in space already, so we don't need to publish
  return false if read!(tuple)

  # another worker is working now, so we don't need to publish
  return false if read!(TupleSpace::WorkingTuple.new(domain: task.domain_id))

  # we need to publish the task
  return true
end

#wait_task_completion(tasks) ⇒ Object

Wait until tasks completed.



438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
# File 'lib/pione/rule-engine/flow-handler.rb', line 438

def wait_task_completion(tasks)
  tasks.each do |task|
    # wait to finish the distributed task, note that finished tuple is in
    # the task domain
    finished = read(TupleSpace::FinishedTuple.new(domain: task.domain_id))

    ### task completion processing ###
    # copy write operation data tuple from the task domain to this domain
    import_outputs_of_task(task, finished)

    # touch tuple
    lift_touch_tuple(task)

    # publish output tickets
    task.rule.output_tickets.pieces.each do |piece|
      write(TupleSpace::TicketTuple.new(domain_id, piece.name))
    end
  end
end