Module: Tasker::Concerns::EventPublisher

Overview

EventPublisher provides a clean interface for publishing events

This concern provides domain-specific event publishing methods that automatically build standardized payloads and resolve event constants. The API is designed for maximum clarity and minimum cognitive overhead.

Enhanced with structured logging integration for production observability.

Usage: include Tasker::Concerns::EventPublisher

# Step events - method name determines event type automatically publish_step_completed(step, operation_count: 42) publish_step_failed(step, error: exception) publish_step_started(step)

# Task events - clean and obvious publish_task_started(task) publish_task_completed(task, total_duration: 120.5) publish_task_failed(task, error_message: "Payment failed")

Instance Method Summary collapse

Instance Method Details

#infer_step_event_type_from_state(step) ⇒ Symbol

Infer step event type from step state and context

Parameters:

Returns:

  • (Symbol)

    The inferred event type



608
609
610
611
612
613
614
615
616
617
618
619
620
621
# File 'lib/tasker/concerns/event_publisher.rb', line 608

def infer_step_event_type_from_state(step)
  case step.status
  when Tasker::Constants::WorkflowStepStatuses::IN_PROGRESS
    :started
  when Tasker::Constants::WorkflowStepStatuses::COMPLETE
    :completed
  when Tasker::Constants::WorkflowStepStatuses::ERROR
    :failed
  when Tasker::Constants::WorkflowStepStatuses::CANCELLED
    :cancelled
  else
    :started # Default fallback
  end
end

#publish_custom_event(event_name, payload = {}) ⇒ void

This method returns an undefined value.

Publish a custom event with standard metadata Assumes the event is already registered

Parameters:

  • event_name (String)

    Event name

  • payload (Hash) (defaults to: {})

    Event payload



559
560
561
562
563
564
565
566
567
568
569
570
571
572
# File 'lib/tasker/concerns/event_publisher.rb', line 559

def publish_custom_event(event_name, payload = {})
  # Add standard metadata
  enhanced_payload = payload.merge(
    event_type: 'custom',
    timestamp: Time.current
  )

  publish_event_with_logging(event_name, enhanced_payload) do
    log_structured(:info, 'Custom event published',
                   event_name: event_name,
                   event_type: 'custom',
                   **payload.slice(:task_id, :step_id, :operation, :context))
  end
end

#publish_no_viable_steps(task_id, reason: 'No steps ready for execution', **context) ⇒ void

This method returns an undefined value.

Publish no viable steps event Automatically resolves to WorkflowEvents::NO_VIABLE_STEPS

Parameters:

  • task_id (String)

    The task ID

  • reason (String) (defaults to: 'No steps ready for execution')

    The reason why no steps are viable

  • context (Hash)

    Additional orchestration context



272
273
274
275
276
277
278
# File 'lib/tasker/concerns/event_publisher.rb', line 272

def publish_no_viable_steps(task_id, reason: 'No steps ready for execution', **context)
  context = context.merge(task_id: task_id, reason: reason)
  payload = build_orchestration_payload(:no_viable_steps, context)
  publish_event_with_logging(Tasker::Constants::WorkflowEvents::NO_VIABLE_STEPS, payload) do
    log_orchestration_event('no_viable_steps', :detected, task_id: task_id, reason: reason)
  end
end

#publish_step_backoff(step, backoff_seconds:, backoff_type: 'exponential', **context) ⇒ void

This method returns an undefined value.

Publish step backoff event (for retry/rate limiting scenarios) Automatically resolves to ObservabilityEvents::Step::BACKOFF

Parameters:

  • step (WorkflowStep)

    The step being backed off

  • backoff_seconds (Float)

    Number of seconds to wait

  • backoff_type (String) (defaults to: 'exponential')

    Type of backoff (server_requested/exponential)

  • context (Hash)

    Additional backoff context



509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
# File 'lib/tasker/concerns/event_publisher.rb', line 509

def publish_step_backoff(step, backoff_seconds:, backoff_type: 'exponential', **context)
  context = context.merge(
    step_id: step.workflow_step_id,
    step_name: step.name,
    backoff_seconds: backoff_seconds,
    backoff_type: backoff_type
  )

  payload = build_step_payload(step, :backoff, context)
  publish_event_with_logging(Tasker::Constants::ObservabilityEvents::Step::BACKOFF, payload) do
    log_step_event(step, :backoff,
                   backoff_seconds: backoff_seconds,
                   backoff_type: backoff_type,
                   **context.slice(:retry_attempt, :max_retries))
  end
end

#publish_step_before_handle(step, **context) ⇒ void

This method returns an undefined value.

Publish step before handle event Automatically resolves to StepEvents::BEFORE_HANDLE with :before_handle event type

Parameters:

  • step (WorkflowStep)

    The step about to be handled

  • context (Hash)

    Additional context to merge into payload



60
61
62
63
64
65
# File 'lib/tasker/concerns/event_publisher.rb', line 60

def publish_step_before_handle(step, **context)
  payload = build_step_payload(step, :before_handle, context)
  publish_event_with_logging(Tasker::Constants::StepEvents::BEFORE_HANDLE, payload) do
    log_step_event(step, :before_handle, **context.slice(:handler_class, :dependencies_met))
  end
end

#publish_step_cancelled(step, cancellation_reason: 'Step cancelled', **context) ⇒ void

This method returns an undefined value.

Publish step cancelled event Automatically resolves to StepEvents::CANCELLED with :cancelled event type

Parameters:

  • step (WorkflowStep)

    The step being cancelled

  • cancellation_reason (String) (defaults to: 'Step cancelled')

    The reason for cancellation

  • context (Hash)

    Additional context to merge into payload



132
133
134
135
136
137
138
# File 'lib/tasker/concerns/event_publisher.rb', line 132

def publish_step_cancelled(step, cancellation_reason: 'Step cancelled', **context)
  context = context.merge(cancellation_reason: cancellation_reason)
  payload = build_step_payload(step, :cancelled, context)
  publish_event_with_logging(Tasker::Constants::StepEvents::CANCELLED, payload) do
    log_step_event(step, :cancelled, cancellation_reason: cancellation_reason)
  end
end

#publish_step_completed(step, **context) ⇒ void

This method returns an undefined value.

Publish step completed event Automatically resolves to StepEvents::COMPLETED with :completed event type

Parameters:

  • step (WorkflowStep)

    The step that completed

  • context (Hash)

    Additional context to merge into payload



73
74
75
76
77
78
79
# File 'lib/tasker/concerns/event_publisher.rb', line 73

def publish_step_completed(step, **context)
  payload = build_step_payload(step, :completed, context)
  publish_event_with_logging(Tasker::Constants::StepEvents::COMPLETED, payload) do
    duration = context[:duration] || extract_duration_from_step(step)
    log_step_event(step, :completed, duration: duration, **context.slice(:operation_count, :records_processed))
  end
end

#publish_step_event_for_context(step, context_hint: nil, **context) ⇒ void

This method returns an undefined value.

Automatically determine and publish the appropriate step event based on step state This method uses the step's current state to infer the most appropriate event type

Parameters:

  • step (WorkflowStep)

    The step object

  • context_hint (Symbol, nil) (defaults to: nil)

    Optional hint about the context

  • context (Hash)

    Additional context to merge into payload



585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
# File 'lib/tasker/concerns/event_publisher.rb', line 585

def publish_step_event_for_context(step, context_hint: nil, **context)
  event_type = context_hint || infer_step_event_type_from_state(step)

  case event_type
  when :started, :execution_requested
    publish_step_started(step, **context)
  when :completed, :success
    publish_step_completed(step, **context)
  when :failed, :failure, :error
    publish_step_failed(step, **context)
  when :retry, :retry_requested
    publish_step_retry_requested(step, **context)
  when :cancelled
    publish_step_cancelled(step, **context)
  else
    Rails.logger.warn("Unknown step event context: #{event_type} for step #{step.workflow_step_id}")
  end
end

#publish_step_failed(step, error: nil, **context) ⇒ void

This method returns an undefined value.

Publish step failed event Automatically resolves to StepEvents::FAILED with :failed event type Automatically extracts error information if :error is provided

Parameters:

  • step (WorkflowStep)

    The step that failed

  • error (Exception, nil) (defaults to: nil)

    The exception that caused the failure

  • context (Hash)

    Additional context to merge into payload



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/tasker/concerns/event_publisher.rb', line 89

def publish_step_failed(step, error: nil, **context)
  # Automatically extract error information into context
  if error
    context = context.merge(
      error_message: error.message,
      error_class: error.class.name,
      backtrace: error.backtrace&.first(10)
    )
  end

  payload = build_step_payload(step, :failed, context)
  publish_event_with_logging(Tasker::Constants::StepEvents::FAILED, payload) do
    duration = context[:duration] || extract_duration_from_step(step)
    log_step_event(step, :failed, duration: duration, error: error&.message)
    log_exception(error, context: { step_id: step.workflow_step_id, task_id: step.task.task_id }) if error
  end
end

#publish_step_retry_requested(step, retry_reason: 'Step execution failed', **context) ⇒ void

This method returns an undefined value.

Publish step retry requested event Automatically resolves to StepEvents::RETRY_REQUESTED with :retry event type

Parameters:

  • step (WorkflowStep)

    The step being retried

  • retry_reason (String) (defaults to: 'Step execution failed')

    The reason for the retry

  • context (Hash)

    Additional context to merge into payload



114
115
116
117
118
119
120
121
122
123
# File 'lib/tasker/concerns/event_publisher.rb', line 114

def publish_step_retry_requested(step, retry_reason: 'Step execution failed', **context)
  context = context.merge(retry_reason: retry_reason)
  payload = build_step_payload(step, :retry, context)
  publish_event_with_logging(Tasker::Constants::StepEvents::RETRY_REQUESTED, payload) do
    log_step_event(step, :retry_requested,
                   retry_reason: retry_reason,
                   attempt_count: step.attempts,
                   **context.slice(:backoff_seconds, :max_retries))
  end
end

#publish_step_started(step, **context) ⇒ void

This method returns an undefined value.

Publish step started event Automatically resolves to StepEvents::EXECUTION_REQUESTED with :started event type

Parameters:

  • step (WorkflowStep)

    The step being started

  • context (Hash)

    Additional context to merge into payload



47
48
49
50
51
52
# File 'lib/tasker/concerns/event_publisher.rb', line 47

def publish_step_started(step, **context)
  payload = build_step_payload(step, :started, context)
  publish_event_with_logging(Tasker::Constants::StepEvents::EXECUTION_REQUESTED, payload) do
    log_step_event(step, :started, **context.slice(:processing_mode, :concurrent_batch_size))
  end
end

#publish_steps_execution_completed(task, processed_count:, successful_count:, **context) ⇒ void

This method returns an undefined value.

Publish steps execution completed event (batch processing) Automatically resolves to WorkflowEvents::STEPS_EXECUTION_COMPLETED

Parameters:

  • task (Task)

    The task whose steps were executed

  • processed_count (Integer)

    Number of steps processed

  • successful_count (Integer)

    Number of steps that succeeded

  • context (Hash)

    Additional execution context



479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
# File 'lib/tasker/concerns/event_publisher.rb', line 479

def publish_steps_execution_completed(task, processed_count:, successful_count:, **context)
  context = context.merge(
    task_id: task.task_id,
    task_name: task.name,
    processed_count: processed_count,
    successful_count: successful_count
  )

  payload = build_orchestration_payload(:steps_execution_completed, context)
  publish_event_with_logging(Tasker::Constants::WorkflowEvents::STEPS_EXECUTION_COMPLETED, payload) do
    log_orchestration_event('steps_execution_completed', :completed,
                            task_id: task.task_id,
                            processed_count: processed_count,
                            successful_count: successful_count,
                            failure_count: processed_count - successful_count)
  end
end

#publish_steps_execution_started(task, step_count:, processing_mode: 'concurrent', **context) ⇒ void

This method returns an undefined value.

Publish steps execution started event (batch processing) Automatically resolves to WorkflowEvents::STEPS_EXECUTION_STARTED

Parameters:

  • task (Task)

    The task whose steps are being executed

  • step_count (Integer)

    Number of steps being executed

  • processing_mode (String) (defaults to: 'concurrent')

    The processing mode (concurrent/sequential)

  • context (Hash)

    Additional execution context



459
460
461
462
463
464
465
466
467
468
469
# File 'lib/tasker/concerns/event_publisher.rb', line 459

def publish_steps_execution_started(task, step_count:, processing_mode: 'concurrent', **context)
  context = context.merge(
    task_id: task.task_id,
    task_name: task.name,
    step_count: step_count,
    processing_mode: processing_mode
  )

  payload = build_orchestration_payload(:steps_execution_started, context)
  publish_event(Tasker::Constants::WorkflowEvents::STEPS_EXECUTION_STARTED, payload)
end

#publish_task_completed(task, **context) ⇒ void

This method returns an undefined value.

Publish task completed event Automatically resolves to TaskEvents::COMPLETED with :completed event type

Parameters:

  • task (Task)

    The task that completed

  • context (Hash)

    Additional context to merge into payload



163
164
165
166
167
168
# File 'lib/tasker/concerns/event_publisher.rb', line 163

def publish_task_completed(task, **context)
  payload = build_task_payload(task, :completed, context)
  publish_event_with_logging(Tasker::Constants::TaskEvents::COMPLETED, payload) do
    log_task_event(task, :completed, **context.slice(:total_duration, :completed_steps, :total_steps))
  end
end

#publish_task_enqueue(task, **context) ⇒ void

This method returns an undefined value.

Publish task enqueue event (for job scheduling observability) Automatically resolves to ObservabilityEvents::Task::ENQUEUE

Parameters:

  • task (Task)

    The task being enqueued

  • context (Hash)

    Additional enqueue context



536
537
538
539
540
541
542
543
544
545
546
547
# File 'lib/tasker/concerns/event_publisher.rb', line 536

def publish_task_enqueue(task, **context)
  context = context.merge(
    task_id: task.task_id,
    task_name: task.name,
    task_context: task.context
  )

  payload = build_task_payload(task, :enqueue, context)
  publish_event_with_logging(Tasker::Constants::ObservabilityEvents::Task::ENQUEUE, payload) do
    log_task_event(task, :enqueued, **context.slice(:queue_name, :job_class, :priority))
  end
end

#publish_task_failed(task, error_message: 'Task execution failed', error_steps: [], **context) ⇒ void

This method returns an undefined value.

Publish task failed event Automatically resolves to TaskEvents::FAILED with :failed event type

Parameters:

  • task (Task)

    The task that failed

  • error_message (String) (defaults to: 'Task execution failed')

    The error message

  • error_steps (Array) (defaults to: [])

    Array of failed step information

  • context (Hash)

    Additional context to merge into payload



178
179
180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/tasker/concerns/event_publisher.rb', line 178

def publish_task_failed(task, error_message: 'Task execution failed', error_steps: [], **context)
  context = context.merge(
    error_message: error_message,
    error_steps: error_steps
  )

  payload = build_task_payload(task, :failed, context)
  publish_event_with_logging(Tasker::Constants::TaskEvents::FAILED, payload) do
    log_task_event(task, :failed,
                   error: error_message,
                   failed_step_count: error_steps.size,
                   **context.slice(:total_duration, :completed_steps))
  end
end

#publish_task_finalization_completed(task, processed_steps_count: 0, **context) ⇒ void

This method returns an undefined value.

Publish task finalization completed event

Parameters:

  • task (Task)

    The task that was finalized

  • processed_steps_count (Integer) (defaults to: 0)

    Number of steps processed

  • context (Hash)

    Additional context



309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
# File 'lib/tasker/concerns/event_publisher.rb', line 309

def publish_task_finalization_completed(task, processed_steps_count: 0, **context)
  context = context.merge(
    task_id: task.task_id,
    processed_steps_count: processed_steps_count,
    final_status: task.status
  )

  payload = build_orchestration_payload(:task_finalization_completed, context)
  publish_event_with_logging(Tasker::Constants::WorkflowEvents::TASK_FINALIZATION_COMPLETED, payload) do
    log_orchestration_event('task_finalization', :completed,
                            task_id: task.task_id,
                            final_status: task.status,
                            processed_steps_count: processed_steps_count)
  end
end

#publish_task_finalization_started(task, processed_steps_count: 0, **context) ⇒ void

This method returns an undefined value.

Publish task finalization started event

Parameters:

  • task (Task)

    The task being finalized

  • processed_steps_count (Integer) (defaults to: 0)

    Number of steps processed

  • context (Hash)

    Additional context



290
291
292
293
294
295
296
297
298
299
300
301
# File 'lib/tasker/concerns/event_publisher.rb', line 290

def publish_task_finalization_started(task, processed_steps_count: 0, **context)
  context = context.merge(
    task_id: task.task_id,
    processed_steps_count: processed_steps_count
  )

  payload = build_orchestration_payload(:task_finalization_started, context)
  publish_event_with_logging(Tasker::Constants::WorkflowEvents::TASK_FINALIZATION_STARTED, payload) do
    log_orchestration_event('task_finalization', :started,
                            task_id: task.task_id, processed_steps_count: processed_steps_count)
  end
end

#publish_task_pending_transition(task, reason: 'Task set to pending', **context) ⇒ void

This method returns an undefined value.

Publish task pending transition event (for synchronous processing) Automatically resolves to TaskEvents::INITIALIZE_REQUESTED with pending context

Parameters:

  • task (Task)

    The task being set to pending

  • reason (String) (defaults to: 'Task set to pending')

    The reason for setting to pending

  • context (Hash)

    Additional pending context



332
333
334
335
336
337
338
339
340
341
# File 'lib/tasker/concerns/event_publisher.rb', line 332

def publish_task_pending_transition(task, reason: 'Task set to pending', **context)
  context = context.merge(
    task_id: task.task_id,
    task_name: task.name,
    reason: reason
  )

  payload = build_task_payload(task, :pending_transition, context)
  publish_event(Tasker::Constants::TaskEvents::INITIALIZE_REQUESTED, payload)
end

#publish_task_reenqueue_delayed(task, delay_seconds:, reason: 'Task reenqueue delayed', **context) ⇒ void

This method returns an undefined value.

Publish task reenqueue delayed event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_DELAYED

Parameters:

  • task (Task)

    The task being delayed for reenqueue

  • delay_seconds (Integer)

    Number of seconds to delay

  • reason (String) (defaults to: 'Task reenqueue delayed')

    The reason for delayed reenqueue

  • context (Hash)

    Additional reenqueue context



433
434
435
436
437
438
439
440
441
442
443
444
445
# File 'lib/tasker/concerns/event_publisher.rb', line 433

def publish_task_reenqueue_delayed(task, delay_seconds:, reason: 'Task reenqueue delayed', **context)
  context = context.merge(
    task_id: task.task_id,
    task_name: task.name,
    reason: reason,
    delay_seconds: delay_seconds,
    scheduled_for: Time.current + delay_seconds.seconds,
    timestamp: Time.current
  )

  payload = build_orchestration_payload(:task_reenqueue_delayed, context)
  publish_event(Tasker::Constants::WorkflowEvents::TASK_REENQUEUE_DELAYED, payload)
end

#publish_task_reenqueue_failed(task, reason: 'Task reenqueue failed', error: 'Unknown error', **context) ⇒ void

This method returns an undefined value.

Publish task reenqueue failed event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_FAILED

Parameters:

  • task (Task)

    The task that failed to reenqueue

  • reason (String) (defaults to: 'Task reenqueue failed')

    The reason for reenqueue attempt

  • error (String) (defaults to: 'Unknown error')

    The error message

  • context (Hash)

    Additional reenqueue context



412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/tasker/concerns/event_publisher.rb', line 412

def publish_task_reenqueue_failed(task, reason: 'Task reenqueue failed', error: 'Unknown error', **context)
  context = context.merge(
    task_id: task.task_id,
    task_name: task.name,
    reason: reason,
    error: error,
    timestamp: Time.current
  )

  payload = build_orchestration_payload(:task_reenqueue_failed, context)
  publish_event(Tasker::Constants::WorkflowEvents::TASK_REENQUEUE_FAILED, payload)
end

#publish_task_reenqueue_requested(task, reason: 'Task reenqueue requested', **context) ⇒ void

This method returns an undefined value.

Publish task reenqueue requested event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_REQUESTED

Parameters:

  • task (Task)

    The task reenqueue was requested for

  • reason (String) (defaults to: 'Task reenqueue requested')

    The reason for reenqueue

  • context (Hash)

    Additional reenqueue context



392
393
394
395
396
397
398
399
400
401
402
# File 'lib/tasker/concerns/event_publisher.rb', line 392

def publish_task_reenqueue_requested(task, reason: 'Task reenqueue requested', **context)
  context = context.merge(
    task_id: task.task_id,
    task_name: task.name,
    reason: reason,
    timestamp: Time.current
  )

  payload = build_orchestration_payload(:task_reenqueue_requested, context)
  publish_event(Tasker::Constants::WorkflowEvents::TASK_REENQUEUE_REQUESTED, payload)
end

#publish_task_reenqueue_started(task, reason: 'Task reenqueue started', **context) ⇒ void

This method returns an undefined value.

Publish task reenqueue started event Automatically resolves to WorkflowEvents::TASK_REENQUEUE_STARTED

Parameters:

  • task (Task)

    The task being reenqueued

  • reason (String) (defaults to: 'Task reenqueue started')

    The reason for reenqueue

  • context (Hash)

    Additional reenqueue context



372
373
374
375
376
377
378
379
380
381
382
383
# File 'lib/tasker/concerns/event_publisher.rb', line 372

def publish_task_reenqueue_started(task, reason: 'Task reenqueue started', **context)
  context = context.merge(
    task_id: task.task_id,
    task_name: task.name,
    reason: reason,
    current_status: task.status,
    timestamp: Time.current
  )

  payload = build_orchestration_payload(:task_reenqueue_started, context)
  publish_event(Tasker::Constants::WorkflowEvents::TASK_REENQUEUE_STARTED, payload)
end

#publish_task_retry_requested(task, retry_reason: 'Task retry requested', **context) ⇒ void

This method returns an undefined value.

Publish task retry requested event Automatically resolves to TaskEvents::RETRY_REQUESTED with :retry event type

Parameters:

  • task (Task)

    The task being retried

  • retry_reason (String) (defaults to: 'Task retry requested')

    The reason for the retry

  • context (Hash)

    Additional context to merge into payload



200
201
202
203
204
205
206
# File 'lib/tasker/concerns/event_publisher.rb', line 200

def publish_task_retry_requested(task, retry_reason: 'Task retry requested', **context)
  context = context.merge(retry_reason: retry_reason)
  payload = build_task_payload(task, :retry, context)
  publish_event_with_logging(Tasker::Constants::TaskEvents::RETRY_REQUESTED, payload) do
    log_task_event(task, :retry_requested, retry_reason: retry_reason)
  end
end

#publish_task_started(task, **context) ⇒ void

This method returns an undefined value.

Publish task started event Automatically resolves to TaskEvents::START_REQUESTED with :started event type

Parameters:

  • task (Task)

    The task being started

  • context (Hash)

    Additional context to merge into payload



150
151
152
153
154
155
# File 'lib/tasker/concerns/event_publisher.rb', line 150

def publish_task_started(task, **context)
  payload = build_task_payload(task, :started, context)
  publish_event_with_logging(Tasker::Constants::TaskEvents::START_REQUESTED, payload) do
    log_task_event(task, :started, **context.slice(:execution_mode, :priority, :step_count))
  end
end

#publish_viable_steps_discovered(task_id, step_ids, processing_mode: 'concurrent', **context) ⇒ void

This method returns an undefined value.

Publish viable steps discovered event Automatically resolves to WorkflowEvents::VIABLE_STEPS_DISCOVERED

Parameters:

  • task_id (String)

    The task ID

  • step_ids (Array<String>)

    Array of step IDs that are viable

  • processing_mode (String) (defaults to: 'concurrent')

    The processing mode ('concurrent' or 'sequential')

  • context (Hash)

    Additional orchestration context



250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/tasker/concerns/event_publisher.rb', line 250

def publish_viable_steps_discovered(task_id, step_ids, processing_mode: 'concurrent', **context)
  context = context.merge(
    task_id: task_id,
    step_ids: step_ids,
    processing_mode: processing_mode,
    step_count: step_ids.size
  )

  payload = build_orchestration_payload(:viable_steps_discovered, context)
  publish_event_with_logging(Tasker::Constants::WorkflowEvents::VIABLE_STEPS_DISCOVERED, payload) do
    log_orchestration_event('viable_steps_discovered', :discovered,
                            task_id: task_id, step_count: step_ids.size, processing_mode: processing_mode)
  end
end

#publish_workflow_state_unclear(task, reason: 'Task in unclear state', **context) ⇒ void

This method returns an undefined value.

Publish workflow unclear state event (for monitoring/alerting) Automatically resolves to WorkflowEvents::TASK_STATE_UNCLEAR

Parameters:

  • task (Task)

    The task in unclear state

  • reason (String) (defaults to: 'Task in unclear state')

    The reason the state is unclear

  • context (Hash)

    Additional unclear state context



350
351
352
353
354
355
356
357
358
359
# File 'lib/tasker/concerns/event_publisher.rb', line 350

def publish_workflow_state_unclear(task, reason: 'Task in unclear state', **context)
  context = context.merge(
    task_id: task.task_id,
    task_name: task.name,
    reason: reason
  )

  payload = build_orchestration_payload(:task_state_unclear, context)
  publish_event(Tasker::Constants::WorkflowEvents::TASK_STATE_UNCLEAR, payload)
end

#publish_workflow_step_completed(task_id, step_id, **context) ⇒ void

This method returns an undefined value.

Publish workflow step completed event (orchestration layer) Automatically resolves to WorkflowEvents::STEP_COMPLETED

Parameters:

  • task_id (String)

    The task ID

  • step_id (String)

    The step ID

  • context (Hash)

    Additional orchestration context



233
234
235
236
237
238
239
240
# File 'lib/tasker/concerns/event_publisher.rb', line 233

def publish_workflow_step_completed(task_id, step_id, **context)
  context = context.merge(task_id: task_id, step_id: step_id)
  payload = build_orchestration_payload(:step_completed, context)
  publish_event_with_logging(Tasker::Constants::WorkflowEvents::STEP_COMPLETED, payload) do
    log_orchestration_event('workflow_step_completed', :completed,
                            task_id: task_id, step_id: step_id, **context)
  end
end

#publish_workflow_task_started(task_id, **context) ⇒ void

This method returns an undefined value.

Publish workflow task started event (orchestration layer) Automatically resolves to WorkflowEvents::TASK_STARTED

Parameters:

  • task_id (String)

    The task ID

  • context (Hash)

    Additional orchestration context



218
219
220
221
222
223
224
# File 'lib/tasker/concerns/event_publisher.rb', line 218

def publish_workflow_task_started(task_id, **context)
  context = context.merge(task_id: task_id)
  payload = build_orchestration_payload(:task_started, context)
  publish_event_with_logging(Tasker::Constants::WorkflowEvents::TASK_STARTED, payload) do
    log_orchestration_event('workflow_task_started', :started, task_id: task_id, **context)
  end
end