Module: Tasker::Concerns::EventPublisher
- Extended by:
- ActiveSupport::Concern
- Included in:
- StateMachineBase, Orchestration::BackoffCalculator, Orchestration::ResponseProcessor, Orchestration::StepExecutor, Orchestration::TaskFinalizer, Orchestration::TaskInitializer, Orchestration::TaskReenqueuer, Orchestration::ViableStepDiscovery, Orchestration::WorkflowCoordinator, StateMachine::StepStateMachine, StateMachine::TaskStateMachine, StepHandler::Api, StepHandler::Base, TaskHandler::InstanceMethods, TaskRunnerJob
- Defined in:
- lib/tasker/concerns/event_publisher.rb
Overview
EventPublisher provides a clean interface for publishing events
This concern provides domain-specific event publishing methods that automatically build standardized payloads and resolve event constants. The API is designed for maximum clarity and minimum cognitive overhead.
Enhanced with structured logging integration for production observability.
Usage: include Tasker::Concerns::EventPublisher
# Step events - method name determines event type automatically publish_step_completed(step, operation_count: 42) publish_step_failed(step, error: exception) publish_step_started(step)
# Task events - clean and obvious publish_task_started(task) publish_task_completed(task, total_duration: 120.5) publish_task_failed(task, error_message: "Payment failed")
Instance Method Summary collapse
-
#infer_step_event_type_from_state(step) ⇒ Symbol
Infer step event type from step state and context.
-
#publish_custom_event(event_name, payload = {}) ⇒ void
Publish a custom event with standard metadata Assumes the event is already registered.
-
#publish_no_viable_steps(task_id, reason: 'No steps ready for execution', **context) ⇒ void
Publish no viable steps event Automatically resolves to WorkflowEvents::NO_VIABLE_STEPS.
-
#publish_step_backoff(step, backoff_seconds:, backoff_type: 'exponential', **context) ⇒ void
Publish step backoff event (for retry/rate limiting scenarios) Automatically resolves to ObservabilityEvents::Step::BACKOFF.
-
#publish_step_before_handle(step, **context) ⇒ void
Publish step before handle event Automatically resolves to StepEvents::BEFORE_HANDLE with :before_handle event type.
-
#publish_step_cancelled(step, cancellation_reason: 'Step cancelled', **context) ⇒ void
Publish step cancelled event Automatically resolves to StepEvents::CANCELLED with :cancelled event type.
-
#publish_step_completed(step, **context) ⇒ void
Publish step completed event Automatically resolves to StepEvents::COMPLETED with :completed event type.
-
#publish_step_event_for_context(step, context_hint: nil, **context) ⇒ void
Automatically determine and publish the appropriate step event based on step state This method uses the step's current state to infer the most appropriate event type.
-
#publish_step_failed(step, error: nil, **context) ⇒ void
Publish step failed event Automatically resolves to StepEvents::FAILED with :failed event type Automatically extracts error information if :error is provided.
-
#publish_step_retry_requested(step, retry_reason: 'Step execution failed', **context) ⇒ void
Publish step retry requested event Automatically resolves to StepEvents::RETRY_REQUESTED with :retry event type.
-
#publish_step_started(step, **context) ⇒ void
Publish step started event Automatically resolves to StepEvents::EXECUTION_REQUESTED with :started event type.
-
#publish_steps_execution_completed(task, processed_count:, successful_count:, **context) ⇒ void
Publish steps execution completed event (batch processing) Automatically resolves to WorkflowEvents::STEPS_EXECUTION_COMPLETED.
-
#publish_steps_execution_started(task, step_count:, processing_mode: 'concurrent', **context) ⇒ void
Publish steps execution started event (batch processing) Automatically resolves to WorkflowEvents::STEPS_EXECUTION_STARTED.
-
#publish_task_completed(task, **context) ⇒ void
Publish task completed event Automatically resolves to TaskEvents::COMPLETED with :completed event type.
-
#publish_task_enqueue(task, **context) ⇒ void
Publish task enqueue event (for job scheduling observability) Automatically resolves to ObservabilityEvents::Task::ENQUEUE.
-
#publish_task_failed(task, error_message: 'Task execution failed', error_steps: [], **context) ⇒ void
Publish task failed event Automatically resolves to TaskEvents::FAILED with :failed event type.
-
#publish_task_finalization_completed(task, processed_steps_count: 0, **context) ⇒ void
Publish task finalization completed event.
-
#publish_task_finalization_started(task, processed_steps_count: 0, **context) ⇒ void
Publish task finalization started event.
-
#publish_task_pending_transition(task, reason: 'Task set to pending', **context) ⇒ void
Publish task pending transition event (for synchronous processing) Automatically resolves to TaskEvents::INITIALIZE_REQUESTED with pending context.
-
#publish_task_reenqueue_delayed(task, delay_seconds:, reason: 'Task reenqueue delayed', **context) ⇒ void
Publish task reenqueue delayed event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_DELAYED.
-
#publish_task_reenqueue_failed(task, reason: 'Task reenqueue failed', error: 'Unknown error', **context) ⇒ void
Publish task reenqueue failed event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_FAILED.
-
#publish_task_reenqueue_requested(task, reason: 'Task reenqueue requested', **context) ⇒ void
Publish task reenqueue requested event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_REQUESTED.
-
#publish_task_reenqueue_started(task, reason: 'Task reenqueue started', **context) ⇒ void
Publish task reenqueue started event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_STARTED.
-
#publish_task_retry_requested(task, retry_reason: 'Task retry requested', **context) ⇒ void
Publish task retry requested event Automatically resolves to TaskEvents::RETRY_REQUESTED with :retry event type.
-
#publish_task_started(task, **context) ⇒ void
Publish task started event Automatically resolves to TaskEvents::START_REQUESTED with :started event type.
-
#publish_viable_steps_discovered(task_id, step_ids, processing_mode: 'concurrent', **context) ⇒ void
Publish viable steps discovered event Automatically resolves to WorkflowEvents::VIABLE_STEPS_DISCOVERED.
-
#publish_workflow_state_unclear(task, reason: 'Task in unclear state', **context) ⇒ void
Publish workflow unclear state event (for monitoring/alerting) Automatically resolves to WorkflowEvents::TASK_STATE_UNCLEAR.
-
#publish_workflow_step_completed(task_id, step_id, **context) ⇒ void
Publish workflow step completed event (orchestration layer) Automatically resolves to WorkflowEvents::STEP_COMPLETED.
-
#publish_workflow_task_started(task_id, **context) ⇒ void
Publish workflow task started event (orchestration layer) Automatically resolves to WorkflowEvents::TASK_STARTED.
Instance Method Details
#infer_step_event_type_from_state(step) ⇒ Symbol
Infer step event type from step state and context
608 609 610 611 612 613 614 615 616 617 618 619 620 621 |
# File 'lib/tasker/concerns/event_publisher.rb', line 608 def infer_step_event_type_from_state(step) case step.status when Tasker::Constants::WorkflowStepStatuses::IN_PROGRESS :started when Tasker::Constants::WorkflowStepStatuses::COMPLETE :completed when Tasker::Constants::WorkflowStepStatuses::ERROR :failed when Tasker::Constants::WorkflowStepStatuses::CANCELLED :cancelled else :started # Default fallback end end |
#publish_custom_event(event_name, payload = {}) ⇒ void
This method returns an undefined value.
Publish a custom event with standard metadata Assumes the event is already registered
559 560 561 562 563 564 565 566 567 568 569 570 571 572 |
# File 'lib/tasker/concerns/event_publisher.rb', line 559 def publish_custom_event(event_name, payload = {}) # Add standard metadata enhanced_payload = payload.merge( event_type: 'custom', timestamp: Time.current ) publish_event_with_logging(event_name, enhanced_payload) do log_structured(:info, 'Custom event published', event_name: event_name, event_type: 'custom', **payload.slice(:task_id, :step_id, :operation, :context)) end end |
#publish_no_viable_steps(task_id, reason: 'No steps ready for execution', **context) ⇒ void
This method returns an undefined value.
Publish no viable steps event Automatically resolves to WorkflowEvents::NO_VIABLE_STEPS
272 273 274 275 276 277 278 |
# File 'lib/tasker/concerns/event_publisher.rb', line 272 def publish_no_viable_steps(task_id, reason: 'No steps ready for execution', **context) context = context.merge(task_id: task_id, reason: reason) payload = build_orchestration_payload(:no_viable_steps, context) publish_event_with_logging(Tasker::Constants::WorkflowEvents::NO_VIABLE_STEPS, payload) do log_orchestration_event('no_viable_steps', :detected, task_id: task_id, reason: reason) end end |
#publish_step_backoff(step, backoff_seconds:, backoff_type: 'exponential', **context) ⇒ void
This method returns an undefined value.
Publish step backoff event (for retry/rate limiting scenarios) Automatically resolves to ObservabilityEvents::Step::BACKOFF
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 |
# File 'lib/tasker/concerns/event_publisher.rb', line 509 def publish_step_backoff(step, backoff_seconds:, backoff_type: 'exponential', **context) context = context.merge( step_id: step.workflow_step_id, step_name: step.name, backoff_seconds: backoff_seconds, backoff_type: backoff_type ) payload = build_step_payload(step, :backoff, context) publish_event_with_logging(Tasker::Constants::ObservabilityEvents::Step::BACKOFF, payload) do log_step_event(step, :backoff, backoff_seconds: backoff_seconds, backoff_type: backoff_type, **context.slice(:retry_attempt, :max_retries)) end end |
#publish_step_before_handle(step, **context) ⇒ void
This method returns an undefined value.
Publish step before handle event Automatically resolves to StepEvents::BEFORE_HANDLE with :before_handle event type
60 61 62 63 64 65 |
# File 'lib/tasker/concerns/event_publisher.rb', line 60 def publish_step_before_handle(step, **context) payload = build_step_payload(step, :before_handle, context) publish_event_with_logging(Tasker::Constants::StepEvents::BEFORE_HANDLE, payload) do log_step_event(step, :before_handle, **context.slice(:handler_class, :dependencies_met)) end end |
#publish_step_cancelled(step, cancellation_reason: 'Step cancelled', **context) ⇒ void
This method returns an undefined value.
Publish step cancelled event Automatically resolves to StepEvents::CANCELLED with :cancelled event type
132 133 134 135 136 137 138 |
# File 'lib/tasker/concerns/event_publisher.rb', line 132 def publish_step_cancelled(step, cancellation_reason: 'Step cancelled', **context) context = context.merge(cancellation_reason: cancellation_reason) payload = build_step_payload(step, :cancelled, context) publish_event_with_logging(Tasker::Constants::StepEvents::CANCELLED, payload) do log_step_event(step, :cancelled, cancellation_reason: cancellation_reason) end end |
#publish_step_completed(step, **context) ⇒ void
This method returns an undefined value.
Publish step completed event Automatically resolves to StepEvents::COMPLETED with :completed event type
73 74 75 76 77 78 79 |
# File 'lib/tasker/concerns/event_publisher.rb', line 73 def publish_step_completed(step, **context) payload = build_step_payload(step, :completed, context) publish_event_with_logging(Tasker::Constants::StepEvents::COMPLETED, payload) do duration = context[:duration] || extract_duration_from_step(step) log_step_event(step, :completed, duration: duration, **context.slice(:operation_count, :records_processed)) end end |
#publish_step_event_for_context(step, context_hint: nil, **context) ⇒ void
This method returns an undefined value.
Automatically determine and publish the appropriate step event based on step state This method uses the step's current state to infer the most appropriate event type
585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 |
# File 'lib/tasker/concerns/event_publisher.rb', line 585 def publish_step_event_for_context(step, context_hint: nil, **context) event_type = context_hint || infer_step_event_type_from_state(step) case event_type when :started, :execution_requested publish_step_started(step, **context) when :completed, :success publish_step_completed(step, **context) when :failed, :failure, :error publish_step_failed(step, **context) when :retry, :retry_requested publish_step_retry_requested(step, **context) when :cancelled publish_step_cancelled(step, **context) else Rails.logger.warn("Unknown step event context: #{event_type} for step #{step.workflow_step_id}") end end |
#publish_step_failed(step, error: nil, **context) ⇒ void
This method returns an undefined value.
Publish step failed event Automatically resolves to StepEvents::FAILED with :failed event type Automatically extracts error information if :error is provided
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/tasker/concerns/event_publisher.rb', line 89 def publish_step_failed(step, error: nil, **context) # Automatically extract error information into context if error context = context.merge( error_message: error., error_class: error.class.name, backtrace: error.backtrace&.first(10) ) end payload = build_step_payload(step, :failed, context) publish_event_with_logging(Tasker::Constants::StepEvents::FAILED, payload) do duration = context[:duration] || extract_duration_from_step(step) log_step_event(step, :failed, duration: duration, error: error&.) log_exception(error, context: { step_id: step.workflow_step_id, task_id: step.task.task_id }) if error end end |
#publish_step_retry_requested(step, retry_reason: 'Step execution failed', **context) ⇒ void
This method returns an undefined value.
Publish step retry requested event Automatically resolves to StepEvents::RETRY_REQUESTED with :retry event type
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/tasker/concerns/event_publisher.rb', line 114 def publish_step_retry_requested(step, retry_reason: 'Step execution failed', **context) context = context.merge(retry_reason: retry_reason) payload = build_step_payload(step, :retry, context) publish_event_with_logging(Tasker::Constants::StepEvents::RETRY_REQUESTED, payload) do log_step_event(step, :retry_requested, retry_reason: retry_reason, attempt_count: step.attempts, **context.slice(:backoff_seconds, :max_retries)) end end |
#publish_step_started(step, **context) ⇒ void
This method returns an undefined value.
Publish step started event Automatically resolves to StepEvents::EXECUTION_REQUESTED with :started event type
47 48 49 50 51 52 |
# File 'lib/tasker/concerns/event_publisher.rb', line 47 def publish_step_started(step, **context) payload = build_step_payload(step, :started, context) publish_event_with_logging(Tasker::Constants::StepEvents::EXECUTION_REQUESTED, payload) do log_step_event(step, :started, **context.slice(:processing_mode, :concurrent_batch_size)) end end |
#publish_steps_execution_completed(task, processed_count:, successful_count:, **context) ⇒ void
This method returns an undefined value.
Publish steps execution completed event (batch processing) Automatically resolves to WorkflowEvents::STEPS_EXECUTION_COMPLETED
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 |
# File 'lib/tasker/concerns/event_publisher.rb', line 479 def publish_steps_execution_completed(task, processed_count:, successful_count:, **context) context = context.merge( task_id: task.task_id, task_name: task.name, processed_count: processed_count, successful_count: successful_count ) payload = build_orchestration_payload(:steps_execution_completed, context) publish_event_with_logging(Tasker::Constants::WorkflowEvents::STEPS_EXECUTION_COMPLETED, payload) do log_orchestration_event('steps_execution_completed', :completed, task_id: task.task_id, processed_count: processed_count, successful_count: successful_count, failure_count: processed_count - successful_count) end end |
#publish_steps_execution_started(task, step_count:, processing_mode: 'concurrent', **context) ⇒ void
This method returns an undefined value.
Publish steps execution started event (batch processing) Automatically resolves to WorkflowEvents::STEPS_EXECUTION_STARTED
459 460 461 462 463 464 465 466 467 468 469 |
# File 'lib/tasker/concerns/event_publisher.rb', line 459 def publish_steps_execution_started(task, step_count:, processing_mode: 'concurrent', **context) context = context.merge( task_id: task.task_id, task_name: task.name, step_count: step_count, processing_mode: processing_mode ) payload = build_orchestration_payload(:steps_execution_started, context) publish_event(Tasker::Constants::WorkflowEvents::STEPS_EXECUTION_STARTED, payload) end |
#publish_task_completed(task, **context) ⇒ void
This method returns an undefined value.
Publish task completed event Automatically resolves to TaskEvents::COMPLETED with :completed event type
163 164 165 166 167 168 |
# File 'lib/tasker/concerns/event_publisher.rb', line 163 def publish_task_completed(task, **context) payload = build_task_payload(task, :completed, context) publish_event_with_logging(Tasker::Constants::TaskEvents::COMPLETED, payload) do log_task_event(task, :completed, **context.slice(:total_duration, :completed_steps, :total_steps)) end end |
#publish_task_enqueue(task, **context) ⇒ void
This method returns an undefined value.
Publish task enqueue event (for job scheduling observability) Automatically resolves to ObservabilityEvents::Task::ENQUEUE
536 537 538 539 540 541 542 543 544 545 546 547 |
# File 'lib/tasker/concerns/event_publisher.rb', line 536 def publish_task_enqueue(task, **context) context = context.merge( task_id: task.task_id, task_name: task.name, task_context: task.context ) payload = build_task_payload(task, :enqueue, context) publish_event_with_logging(Tasker::Constants::ObservabilityEvents::Task::ENQUEUE, payload) do log_task_event(task, :enqueued, **context.slice(:queue_name, :job_class, :priority)) end end |
#publish_task_failed(task, error_message: 'Task execution failed', error_steps: [], **context) ⇒ void
This method returns an undefined value.
Publish task failed event Automatically resolves to TaskEvents::FAILED with :failed event type
178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/tasker/concerns/event_publisher.rb', line 178 def publish_task_failed(task, error_message: 'Task execution failed', error_steps: [], **context) context = context.merge( error_message: , error_steps: error_steps ) payload = build_task_payload(task, :failed, context) publish_event_with_logging(Tasker::Constants::TaskEvents::FAILED, payload) do log_task_event(task, :failed, error: , failed_step_count: error_steps.size, **context.slice(:total_duration, :completed_steps)) end end |
#publish_task_finalization_completed(task, processed_steps_count: 0, **context) ⇒ void
This method returns an undefined value.
Publish task finalization completed event
309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 |
# File 'lib/tasker/concerns/event_publisher.rb', line 309 def publish_task_finalization_completed(task, processed_steps_count: 0, **context) context = context.merge( task_id: task.task_id, processed_steps_count: processed_steps_count, final_status: task.status ) payload = build_orchestration_payload(:task_finalization_completed, context) publish_event_with_logging(Tasker::Constants::WorkflowEvents::TASK_FINALIZATION_COMPLETED, payload) do log_orchestration_event('task_finalization', :completed, task_id: task.task_id, final_status: task.status, processed_steps_count: processed_steps_count) end end |
#publish_task_finalization_started(task, processed_steps_count: 0, **context) ⇒ void
This method returns an undefined value.
Publish task finalization started event
290 291 292 293 294 295 296 297 298 299 300 301 |
# File 'lib/tasker/concerns/event_publisher.rb', line 290 def publish_task_finalization_started(task, processed_steps_count: 0, **context) context = context.merge( task_id: task.task_id, processed_steps_count: processed_steps_count ) payload = build_orchestration_payload(:task_finalization_started, context) publish_event_with_logging(Tasker::Constants::WorkflowEvents::TASK_FINALIZATION_STARTED, payload) do log_orchestration_event('task_finalization', :started, task_id: task.task_id, processed_steps_count: processed_steps_count) end end |
#publish_task_pending_transition(task, reason: 'Task set to pending', **context) ⇒ void
This method returns an undefined value.
Publish task pending transition event (for synchronous processing) Automatically resolves to TaskEvents::INITIALIZE_REQUESTED with pending context
332 333 334 335 336 337 338 339 340 341 |
# File 'lib/tasker/concerns/event_publisher.rb', line 332 def publish_task_pending_transition(task, reason: 'Task set to pending', **context) context = context.merge( task_id: task.task_id, task_name: task.name, reason: reason ) payload = build_task_payload(task, :pending_transition, context) publish_event(Tasker::Constants::TaskEvents::INITIALIZE_REQUESTED, payload) end |
#publish_task_reenqueue_delayed(task, delay_seconds:, reason: 'Task reenqueue delayed', **context) ⇒ void
This method returns an undefined value.
Publish task reenqueue delayed event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_DELAYED
433 434 435 436 437 438 439 440 441 442 443 444 445 |
# File 'lib/tasker/concerns/event_publisher.rb', line 433 def publish_task_reenqueue_delayed(task, delay_seconds:, reason: 'Task reenqueue delayed', **context) context = context.merge( task_id: task.task_id, task_name: task.name, reason: reason, delay_seconds: delay_seconds, scheduled_for: Time.current + delay_seconds.seconds, timestamp: Time.current ) payload = build_orchestration_payload(:task_reenqueue_delayed, context) publish_event(Tasker::Constants::WorkflowEvents::TASK_REENQUEUE_DELAYED, payload) end |
#publish_task_reenqueue_failed(task, reason: 'Task reenqueue failed', error: 'Unknown error', **context) ⇒ void
This method returns an undefined value.
Publish task reenqueue failed event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_FAILED
412 413 414 415 416 417 418 419 420 421 422 423 |
# File 'lib/tasker/concerns/event_publisher.rb', line 412 def publish_task_reenqueue_failed(task, reason: 'Task reenqueue failed', error: 'Unknown error', **context) context = context.merge( task_id: task.task_id, task_name: task.name, reason: reason, error: error, timestamp: Time.current ) payload = build_orchestration_payload(:task_reenqueue_failed, context) publish_event(Tasker::Constants::WorkflowEvents::TASK_REENQUEUE_FAILED, payload) end |
#publish_task_reenqueue_requested(task, reason: 'Task reenqueue requested', **context) ⇒ void
This method returns an undefined value.
Publish task reenqueue requested event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_REQUESTED
392 393 394 395 396 397 398 399 400 401 402 |
# File 'lib/tasker/concerns/event_publisher.rb', line 392 def publish_task_reenqueue_requested(task, reason: 'Task reenqueue requested', **context) context = context.merge( task_id: task.task_id, task_name: task.name, reason: reason, timestamp: Time.current ) payload = build_orchestration_payload(:task_reenqueue_requested, context) publish_event(Tasker::Constants::WorkflowEvents::TASK_REENQUEUE_REQUESTED, payload) end |
#publish_task_reenqueue_started(task, reason: 'Task reenqueue started', **context) ⇒ void
This method returns an undefined value.
Publish task reenqueue started event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_STARTED
372 373 374 375 376 377 378 379 380 381 382 383 |
# File 'lib/tasker/concerns/event_publisher.rb', line 372 def publish_task_reenqueue_started(task, reason: 'Task reenqueue started', **context) context = context.merge( task_id: task.task_id, task_name: task.name, reason: reason, current_status: task.status, timestamp: Time.current ) payload = build_orchestration_payload(:task_reenqueue_started, context) publish_event(Tasker::Constants::WorkflowEvents::TASK_REENQUEUE_STARTED, payload) end |
#publish_task_retry_requested(task, retry_reason: 'Task retry requested', **context) ⇒ void
This method returns an undefined value.
Publish task retry requested event Automatically resolves to TaskEvents::RETRY_REQUESTED with :retry event type
200 201 202 203 204 205 206 |
# File 'lib/tasker/concerns/event_publisher.rb', line 200 def publish_task_retry_requested(task, retry_reason: 'Task retry requested', **context) context = context.merge(retry_reason: retry_reason) payload = build_task_payload(task, :retry, context) publish_event_with_logging(Tasker::Constants::TaskEvents::RETRY_REQUESTED, payload) do log_task_event(task, :retry_requested, retry_reason: retry_reason) end end |
#publish_task_started(task, **context) ⇒ void
This method returns an undefined value.
Publish task started event Automatically resolves to TaskEvents::START_REQUESTED with :started event type
150 151 152 153 154 155 |
# File 'lib/tasker/concerns/event_publisher.rb', line 150 def publish_task_started(task, **context) payload = build_task_payload(task, :started, context) publish_event_with_logging(Tasker::Constants::TaskEvents::START_REQUESTED, payload) do log_task_event(task, :started, **context.slice(:execution_mode, :priority, :step_count)) end end |
#publish_viable_steps_discovered(task_id, step_ids, processing_mode: 'concurrent', **context) ⇒ void
This method returns an undefined value.
Publish viable steps discovered event Automatically resolves to WorkflowEvents::VIABLE_STEPS_DISCOVERED
250 251 252 253 254 255 256 257 258 259 260 261 262 263 |
# File 'lib/tasker/concerns/event_publisher.rb', line 250 def publish_viable_steps_discovered(task_id, step_ids, processing_mode: 'concurrent', **context) context = context.merge( task_id: task_id, step_ids: step_ids, processing_mode: processing_mode, step_count: step_ids.size ) payload = build_orchestration_payload(:viable_steps_discovered, context) publish_event_with_logging(Tasker::Constants::WorkflowEvents::VIABLE_STEPS_DISCOVERED, payload) do log_orchestration_event('viable_steps_discovered', :discovered, task_id: task_id, step_count: step_ids.size, processing_mode: processing_mode) end end |
#publish_workflow_state_unclear(task, reason: 'Task in unclear state', **context) ⇒ void
This method returns an undefined value.
Publish workflow unclear state event (for monitoring/alerting) Automatically resolves to WorkflowEvents::TASK_STATE_UNCLEAR
350 351 352 353 354 355 356 357 358 359 |
# File 'lib/tasker/concerns/event_publisher.rb', line 350 def publish_workflow_state_unclear(task, reason: 'Task in unclear state', **context) context = context.merge( task_id: task.task_id, task_name: task.name, reason: reason ) payload = build_orchestration_payload(:task_state_unclear, context) publish_event(Tasker::Constants::WorkflowEvents::TASK_STATE_UNCLEAR, payload) end |
#publish_workflow_step_completed(task_id, step_id, **context) ⇒ void
This method returns an undefined value.
Publish workflow step completed event (orchestration layer) Automatically resolves to WorkflowEvents::STEP_COMPLETED
233 234 235 236 237 238 239 240 |
# File 'lib/tasker/concerns/event_publisher.rb', line 233 def publish_workflow_step_completed(task_id, step_id, **context) context = context.merge(task_id: task_id, step_id: step_id) payload = build_orchestration_payload(:step_completed, context) publish_event_with_logging(Tasker::Constants::WorkflowEvents::STEP_COMPLETED, payload) do log_orchestration_event('workflow_step_completed', :completed, task_id: task_id, step_id: step_id, **context) end end |
#publish_workflow_task_started(task_id, **context) ⇒ void
This method returns an undefined value.
Publish workflow task started event (orchestration layer) Automatically resolves to WorkflowEvents::TASK_STARTED
218 219 220 221 222 223 224 |
# File 'lib/tasker/concerns/event_publisher.rb', line 218 def publish_workflow_task_started(task_id, **context) context = context.merge(task_id: task_id) payload = build_orchestration_payload(:task_started, context) publish_event_with_logging(Tasker::Constants::WorkflowEvents::TASK_STARTED, payload) do log_orchestration_event('workflow_task_started', :started, task_id: task_id, **context) end end |