Class: Temporal::Workflow::StateManager

Inherits:
Object
  • Object
show all
Includes:
Concerns::Payloads
Defined in:
lib/temporal/workflow/state_manager.rb

Defined Under Namespace

Classes: UnsupportedEvent, UnsupportedMarkerType

Constant Summary collapse

SIDE_EFFECT_MARKER =
'SIDE_EFFECT'.freeze
RELEASE_MARKER =
'RELEASE'.freeze

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Concerns::Payloads

#from_details_payloads, #from_payload, #from_payloads, #from_result_payloads, #from_signal_payloads, #to_details_payloads, #to_payload, #to_payloads, #to_result_payloads, #to_signal_payloads

Constructor Details

#initialize(dispatcher) ⇒ StateManager

Returns a new instance of StateManager.



23
24
25
26
27
28
29
30
31
32
33
# File 'lib/temporal/workflow/state_manager.rb', line 23

def initialize(dispatcher)
  @dispatcher = dispatcher
  @commands = []
  @marker_ids = Set.new
  @releases = {}
  @side_effects = []
  @command_tracker = Hash.new { |hash, key| hash[key] = CommandStateMachine.new }
  @last_event_id = 0
  @local_time = nil
  @replay = false
end

Instance Attribute Details

#commandsObject (readonly)

Returns the value of attribute commands.



21
22
23
# File 'lib/temporal/workflow/state_manager.rb', line 21

def commands
  @commands
end

#local_timeObject (readonly)

Returns the value of attribute local_time.



21
22
23
# File 'lib/temporal/workflow/state_manager.rb', line 21

def local_time
  @local_time
end

Instance Method Details

#apply(history_window) ⇒ Object



75
76
77
78
79
80
81
82
83
84
85
86
87
88
# File 'lib/temporal/workflow/state_manager.rb', line 75

def apply(history_window)
  @replay = history_window.replay?
  @local_time = history_window.local_time
  @last_event_id = history_window.last_event_id

  # handle markers first since their data is needed for processing events
  history_window.markers.each do |event|
    apply_event(event)
  end

  history_window.events.each do |event|
    apply_event(event)
  end
end

#next_side_effectObject



71
72
73
# File 'lib/temporal/workflow/state_manager.rb', line 71

def next_side_effect
  side_effects.shift
end

#release?(release_name) ⇒ Boolean

Returns:

  • (Boolean)


65
66
67
68
69
# File 'lib/temporal/workflow/state_manager.rb', line 65

def release?(release_name)
  track_release(release_name) unless releases.key?(release_name)

  releases[release_name]
end

#replay?Boolean

Returns:

  • (Boolean)


35
36
37
# File 'lib/temporal/workflow/state_manager.rb', line 35

def replay?
  @replay
end

#schedule(command) ⇒ Object



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# File 'lib/temporal/workflow/state_manager.rb', line 39

def schedule(command)
  # Fast-forward event IDs to skip all the markers (version markers can
  # be removed, so we can't rely on them being scheduled during a replay)
  command_id = next_event_id
  while marker_ids.include?(command_id) do
    command_id = next_event_id
  end

  cancelation_id =
    case command
    when Command::ScheduleActivity
      command.activity_id ||= command_id
    when Command::StartChildWorkflow
      command.workflow_id ||= command_id
    when Command::StartTimer
      command.timer_id ||= command_id
    end

  state_machine = command_tracker[command_id]
  state_machine.requested if state_machine.state == CommandStateMachine::NEW_STATE

  commands << [command_id, command]

  return [event_target_from(command_id, command), cancelation_id]
end