Class: Temporalio::Cancellation

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/cancellation.rb

Overview

Cancellation representation, often known as a “cancellation token”. This is used by clients, activities, and workflows to represent cancellation in a thread/fiber-safe way.

Instance Method Summary collapse

Constructor Details

#initialize(*parents) ⇒ Cancellation

Create a new cancellation.

This is usually created and destructured into a tuple with the second value being the proc to invoke to cancel. For example: ‘cancel, cancel_proc = Temporalio::Cancellation.new`. This is done via #to_ary which returns a proc to issue the cancellation in the second value of the array.

Parameters:

  • parents (Array<Cancellation>)

    Parent cancellations to link this one to. This cancellation will be canceled when any parents are canceled.



17
18
19
20
21
22
23
24
25
26
# File 'lib/temporalio/cancellation.rb', line 17

def initialize(*parents)
  @canceled = false
  @canceled_reason = nil
  @canceled_mutex = Mutex.new
  @canceled_cond_var = nil
  @cancel_callbacks = []
  @shield_depth = 0
  @shield_pending_cancel = nil # When pending, set as single-reason array
  parents.each { |p| p.add_cancel_callback { on_cancel(reason: p.canceled_reason) } }
end

Instance Method Details

#add_cancel_callback(proc = nil) { ... } ⇒ Object

Note:

WARNING: This is advanced API, users should use #wait or similar.

Advanced call to invoke a proc or block on cancel. The callback usually needs to be quick and thread-safe since it is called in the canceler’s thread. Usually the callback will just be something like pushing on a queue or signaling a condition variable. If the cancellation is already canceled, the callback is called inline before returning.

Parameters:

  • proc (Proc, nil) (defaults to: nil)

    Proc to invoke, or nil to use block.

Yields:

  • Accepts block if not using ‘proc`.

Raises:

  • (ArgumentError)


110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/temporalio/cancellation.rb', line 110

def add_cancel_callback(proc = nil, &block)
  raise ArgumentError, 'Must provide proc or block' unless proc || block
  raise ArgumentError, 'Cannot provide both proc and block' if proc && block
  raise ArgumentError, 'Parameter not a proc' if proc && !proc.is_a?(Proc)

  callback_to_run_immediately = @canceled_mutex.synchronize do
    callback = proc || block
    @cancel_callbacks.push(proc || block)
    break nil unless @canceled

    callback
  end
  callback_to_run_immediately&.call
  nil
end

#canceled?Boolean

Returns Whether this cancellation is canceled.

Returns:

  • (Boolean)

    Whether this cancellation is canceled.



29
30
31
# File 'lib/temporalio/cancellation.rb', line 29

def canceled?
  @canceled_mutex.synchronize { @canceled }
end

#canceled_reasonString?

Returns Reason for cancellation. Can be nil if not canceled or no reason provided.

Returns:

  • (String, nil)

    Reason for cancellation. Can be nil if not canceled or no reason provided.



34
35
36
# File 'lib/temporalio/cancellation.rb', line 34

def canceled_reason
  @canceled_mutex.synchronize { @canceled_reason }
end

#check!(err = Error::CanceledError.new('Canceled')) ⇒ Object

Raise an error if this cancellation is canceled.

Parameters:

  • err (Exception) (defaults to: Error::CanceledError.new('Canceled'))

    Error to raise.



51
52
53
# File 'lib/temporalio/cancellation.rb', line 51

def check!(err = Error::CanceledError.new('Canceled'))
  raise err if canceled?
end

#pending_canceled?Boolean

Returns Whether a cancel is pending but currently shielded.

Returns:

  • (Boolean)

    Whether a cancel is pending but currently shielded.



39
40
41
# File 'lib/temporalio/cancellation.rb', line 39

def pending_canceled?
  @canceled_mutex.synchronize { !@shield_pending_cancel.nil? }
end

#pending_canceled_reasonString?

Returns Reason for pending cancellation. Can be nil if not pending canceled or no reason provided.

Returns:

  • (String, nil)

    Reason for pending cancellation. Can be nil if not pending canceled or no reason provided.



44
45
46
# File 'lib/temporalio/cancellation.rb', line 44

def pending_canceled_reason
  @canceled_mutex.synchronize { @shield_pending_cancel&.first }
end

#shield { ... } ⇒ Object

Shield the given block from cancellation. This means any cancellation that occurs while shielded code is running will be set as “pending” and will not take effect until after the block completes. If shield calls are nested, the cancellation remains “pending” until the last shielded block ends.

Yields:

  • Requires a block to run under shield.

Returns:

  • (Object)

    Result of the block.



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/temporalio/cancellation.rb', line 84

def shield
  raise ArgumentError, 'Block required' unless block_given?

  @canceled_mutex.synchronize { @shield_depth += 1 }
  yield
ensure
  callbacks_to_run = @canceled_mutex.synchronize do
    @shield_depth -= 1
    if @shield_depth.zero? && @shield_pending_cancel
      reason = @shield_pending_cancel.first
      @shield_pending_cancel = nil
      prepare_cancel(reason:)
    end
  end
  callbacks_to_run&.each(&:call)
end

#to_aryArray(Cancellation, Proc)

Returns Self and a proc to call to cancel that accepts an optional string ‘reason` keyword argument. As a general practice, only the creator of the cancellation should be the one controlling its cancellation.

Returns:

  • (Array(Cancellation, Proc))

    Self and a proc to call to cancel that accepts an optional string ‘reason` keyword argument. As a general practice, only the creator of the cancellation should be the one controlling its cancellation.



58
59
60
# File 'lib/temporalio/cancellation.rb', line 58

def to_ary
  [self, proc { |reason: nil| on_cancel(reason:) }]
end

#waitObject

Wait on this to be canceled. This is backed by a ConditionVariable.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/temporalio/cancellation.rb', line 63

def wait
  @canceled_mutex.synchronize do
    break if @canceled

    # Add cond var if not present
    if @canceled_cond_var.nil?
      @canceled_cond_var = ConditionVariable.new
      @cancel_callbacks.push(proc { @canceled_mutex.synchronize { @canceled_cond_var.broadcast } })
    end

    # Wait on it
    @canceled_cond_var.wait(@canceled_mutex)
  end
end