Class: Libis::Workflow::Task

Inherits:
Object
  • Object
show all
Includes:
Tools::Logger, Tools::ParameterContainer
Defined in:
lib/libis/workflow/task.rb

Overview

noinspection RubyTooManyMethodsInspection

Direct Known Subclasses

TaskGroup

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(parent, cfg = {}) ⇒ Task

Returns a new instance of Task.



30
31
32
33
34
35
# File 'lib/libis/workflow/task.rb', line 30

def initialize(parent, cfg = {})
  @subitems_stopper = false
  @subtasks_stopper = false
  self.parent = parent
  configure cfg
end

Instance Attribute Details

#nameObject

Returns the value of attribute name.



16
17
18
# File 'lib/libis/workflow/task.rb', line 16

def name
  @name
end

#parentObject

Returns the value of attribute parent.



16
17
18
# File 'lib/libis/workflow/task.rb', line 16

def parent
  @parent
end

#processing_itemObject

Returns the value of attribute processing_item.



16
17
18
# File 'lib/libis/workflow/task.rb', line 16

def processing_item
  @processing_item
end

#workitemObject

Returns the value of attribute workitem.



16
17
18
# File 'lib/libis/workflow/task.rb', line 16

def workitem
  @workitem
end

Class Method Details

.task_classesObject



24
25
26
27
28
# File 'lib/libis/workflow/task.rb', line 24

def self.task_classes
  # noinspection RubyArgCount
  ObjectSpace.each_object(::Class)
      .select {|klass| klass < self && klass != Libis::Workflow::TaskRunner}
end

Instance Method Details

#<<(_task) ⇒ Object



37
38
39
# File 'lib/libis/workflow/task.rb', line 37

def <<(_task)
  raise Libis::WorkflowError, "Processing task '#{namepath}' is not allowed to have subtasks."
end

#apply_options(opts) ⇒ Object



124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/libis/workflow/task.rb', line 124

def apply_options(opts)
  o = {}
  o.merge!(opts[self.class.to_s] || {})
  o.merge!(opts[name] || opts[names.join('/')] || {})

  if o && o.is_a?(Hash)
    default_values.each do |name, _|
      next unless o.key?(name.to_s)
      next if o[name.to_s].nil?
      paramdef = get_parameter_definition name.to_sym
      value = paramdef.parse(o[name.to_s])
      parameter(name.to_sym, value)
    end
  end
end

#loggerObject



152
153
154
# File 'lib/libis/workflow/task.rb', line 152

def logger
  (parent || get_run).logger
end

#message(severity, msg, *args) ⇒ Object



140
141
142
143
144
145
146
147
148
149
150
# File 'lib/libis/workflow/task.rb', line 140

def message(severity, msg, *args)
  taskname = namepath rescue nil
  set_application(taskname)
  item = workitem rescue nil
  item = args.shift if args.size > 0 and args[0].is_a?(::Libis::Workflow::Base::WorkItem)
  subject = item.namepath rescue nil
  subject ||= item.name rescue nil
  subject ||= item.to_s rescue nil
  set_subject(subject)
  super(severity, msg, *args)
end

#namepathObject



120
121
122
# File 'lib/libis/workflow/task.rb', line 120

def namepath
  names.join('/')
end

#namesObject



116
117
118
# File 'lib/libis/workflow/task.rb', line 116

def names
  (parent.names rescue []).push(name).compact
end

#root_taskObject



41
42
43
# File 'lib/libis/workflow/task.rb', line 41

def root_task
  parent&.root_task || self
end

#run(item) ⇒ Object

Parameters:



46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/libis/workflow/task.rb', line 46

def run(item)
  check_item_type ::Libis::Workflow::Base::WorkItem, item
  self.workitem = item

  # case action
  # when :retry
  #   if !parameter(:run_always) && item.check_status(:DONE, namepath)
  #     debug 'Retry: skipping task %s because it has finished successfully.', item, namepath
  #     return item
  #   end
  # when :failed
  #   return item unless parameter(:run_always)
  # else
  #   # type code here
  # end

  return item if action == :failed && !parameter(:run_always)

  (parameter(:retry_count) + 1).times do

    i = run_item(item)
    item = i if i.is_a?(Libis::Workflow::Base::WorkItem)

    # noinspection RubyScope
    case item.status(namepath)
    when :DONE
      # self.action = :run
      return item
    when :ASYNC_WAIT
      self.action = :retry
    when :ASYNC_HALT
      break
    when :FAILED
      break
    else
      return item
    end

    self.action = :retry

    sleep(parameter(:retry_interval))

  end

  item.get_run.action = :failed

  return item

rescue WorkflowError => e
  error e.message, item
  set_status item, :FAILED

rescue WorkflowAbort => e
  set_status item, :FAILED
  raise e if parent

rescue WorkflowAbortForget => e
  set_status item, :FAILED
  raise e

rescue Exception => e
  set_status item, :FAILED
  fatal_error "Aborting ingest because of error: %s @ %s\n%s", item, e.message, e.backtrace.first, e.backtrace.map{|t| ' -- ' + t}.join("\n")
  raise Libis::WorkflowAbort, "#{e.message} @ #{e.backtrace.first}"

ensure
  item.save!

end