Class: Isomorfeus::Operation::RunTask

Inherits:
Object
  • Object
show all
Defined in:
lib/isomorfeus/operation/run_task.rb

Instance Method Summary collapse

Constructor Details

#initialize(task_class, timer_task:, interval:, recurring: false) ⇒ RunTask

Returns a new instance of RunTask.



4
5
6
7
8
9
# File 'lib/isomorfeus/operation/run_task.rb', line 4

def initialize(task_class, timer_task:, interval:, recurring: false)
  @task_class = task_class
  @recurring = recurring
  @timer_task = timer_task
  @interval = interval
end

Instance Method Details

#cleanupObject



84
85
86
87
88
89
90
# File 'lib/isomorfeus/operation/run_task.rb', line 84

def cleanup
  running_tasks = @task_class.search(:state, 'running').sort_by! { |task| task.rtime.to_i }
  running_tasks.each do |task|
    # previous task run has timed out most probably
    mark_as_failed(task, RuntimeError.new('Operation execution timed out, giving up.')) if (@rtime - task.rtime) > @interval
  end
end

#get_tasksObject



49
50
51
# File 'lib/isomorfeus/operation/run_task.rb', line 49

def get_tasks
  @task_class.search(:state, 'ready').sort_by! { |task| task.rtime.to_i }
end

#mark_as_failed(task, exception) ⇒ Object



70
71
72
73
# File 'lib/isomorfeus/operation/run_task.rb', line 70

def mark_as_failed(task, exception)
  task.state = 'failed'
  save_exception(task, exception)
end

#mark_as_ready(task) ⇒ Object



65
66
67
68
# File 'lib/isomorfeus/operation/run_task.rb', line 65

def mark_as_ready(task)
  task.state = 'ready'
  task.save
end

#mark_as_running(task) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/isomorfeus/operation/run_task.rb', line 53

def mark_as_running(task)
  result = false
  task = @task_class.load(key: task.key)
  if task && (task.rtime.nil? || (task.rtime - @rtime) >= @timer_task.execution_interval) && task.state == 'ready'
    task.state = 'running'
    task.rtime = @rtime
    task.save
    result = true
  end
  result
end

#remove_task(task) ⇒ Object



80
81
82
# File 'lib/isomorfeus/operation/run_task.rb', line 80

def remove_task(task)
  @task_class.destroy(key: task.key)
end

#runObject



11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/isomorfeus/operation/run_task.rb', line 11

def run
  @rtime = Time.now
  tasks = get_tasks
  tasks.each do |task|
    marked = mark_as_running(task)
    if marked
      begin
        operation_class = task.operation_class_name.constantize
        user_class_name = task.user_class_name
        Thread.current[:isomorfeus_user] = if user_class_name == 'LocalSystem'
                                             LocalSystem.new
                                           elsif Isomorfeus.valid_user_class_name?(user_class_name)
                                             user_class = user_class_name.constantize
                                             user_class.load(key: task.user_key) || Anonymous.new
                                           else
                                             Anonymous.new
                                           end
        raise 'Access denied!' unless Thread.current[:isomorfeus_user].authorized?(operation_class, :promise_run, task.props)
        if @recurring
          operation_class.promise_run(**task.props)
            .then { mark_as_ready(task) }
            .fail { |e| task.fail ? mark_as_failed(task, e) : save_exception(task, e) }
        else
          operation_class.promise_run(**task.props)
          .then { remove_task(task) }
          .fail { |e| mark_as_failed(task, e) }
        end
      rescue => e
        mark_as_failed(task, e)
      ensure
        Thread.current[:isomorfeus_user] = nil
      end
    end
  end
  cleanup
  @timer_task.execution_interval = @interval - (Time.now - @rtime)
end

#save_exception(taks, exception) ⇒ Object



75
76
77
78
# File 'lib/isomorfeus/operation/run_task.rb', line 75

def save_exception(taks, exception)
  task.exception = Marshal.dump(exception)
  task.save
end