Class: Tasker::Orchestration::TaskInitializer

Inherits:
Object
  • Object
show all
Includes:
Concerns::EventPublisher, Concerns::IdempotentStateTransitions
Defined in:
lib/tasker/orchestration/task_initializer.rb

Overview

TaskInitializer handles task creation and initialization logic

This component is responsible for:

  • Creating tasks from TaskRequest objects
  • Validating task context against schemas
  • Starting task execution (transitioning to in_progress)
  • Enqueuing tasks for processing

Class Method Summary collapse

Instance Method Summary collapse

Methods included from Concerns::EventPublisher

#infer_step_event_type_from_state, #publish_custom_event, #publish_no_viable_steps, #publish_step_backoff, #publish_step_before_handle, #publish_step_cancelled, #publish_step_completed, #publish_step_event_for_context, #publish_step_failed, #publish_step_retry_requested, #publish_step_started, #publish_steps_execution_completed, #publish_steps_execution_started, #publish_task_completed, #publish_task_enqueue, #publish_task_failed, #publish_task_finalization_completed, #publish_task_finalization_started, #publish_task_pending_transition, #publish_task_reenqueue_delayed, #publish_task_reenqueue_failed, #publish_task_reenqueue_requested, #publish_task_reenqueue_started, #publish_task_retry_requested, #publish_task_started, #publish_viable_steps_discovered, #publish_workflow_state_unclear, #publish_workflow_step_completed, #publish_workflow_task_started

Methods included from Concerns::IdempotentStateTransitions

#conditional_transition_to, #in_any_state?, #safe_current_state, #safe_transition_to

Class Method Details

.initialize_task!Tasker::Task

Initialize a new task from a task request

Creates a task record, validates the context against the schema, and enqueues the task for processing.



29
# File 'lib/tasker/orchestration/task_initializer.rb', line 29

delegate :initialize_task!, to: :new

.start_task!Boolean

Start a task's execution

Updates the task status to IN_PROGRESS and fires the appropriate event.



37
# File 'lib/tasker/orchestration/task_initializer.rb', line 37

delegate :start_task!, to: :new

Instance Method Details

#initialize_task!(task_request, task_handler) ⇒ Tasker::Task

Initialize a new task from a task request



45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/tasker/orchestration/task_initializer.rb', line 45

def initialize_task!(task_request, task_handler)
  task = nil
  context_errors = validate_context_with_handler(task_request.context, task_handler)

  if context_errors.length.positive?
    task = Tasker::Task.from_task_request(task_request)
    context_errors.each do |error|
      task.errors.add(:context, error)
    end

    # Use clean API for task initialization failure
    publish_task_failed(
      task,
      error_message: context_errors.join(', '),
      initialization_failed: true
    )
    return task
  end

  Tasker::Task.transaction do
    task = Tasker::Task.create_with_defaults!(task_request)
    # Get sequence and establish dependencies
    StepSequenceFactory.create_sequence_for_task!(task, task_handler)
  end

  # Use clean API for task initialization success
  publish_task_started(
    task,
    step_count: task.workflow_steps.count
  )

  enqueue_task(task)
  task
end

#start_task!(task) ⇒ Boolean

Start a task's execution



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/tasker/orchestration/task_initializer.rb', line 84

def start_task!(task)
  raise(Tasker::ProceduralError, "task already complete for task #{task.task_id}") if task.complete

  unless task.status == Tasker::Constants::TaskStatuses::PENDING
    raise(Tasker::ProceduralError,
          "task is not pending for task #{task.task_id}, status is #{task.status}")
  end

  task.context = ActiveSupport::HashWithIndifferentAccess.new(task.context)

  # Use state machine to transition task to in_progress
  unless safe_transition_to(task, Tasker::Constants::TaskStatuses::IN_PROGRESS, {
                              initialization_completed: true,
                              step_dependencies_established: task.workflow_steps.count
                            })

    # Use clean API for task start failure
    publish_task_failed(
      task,
      error_message: 'Failed to transition to in_progress',
      initialization_failed: true
    )

    return false
  end

  # Use clean API for task start success
  publish_task_started(task, task_context: task.context)

  true
end