Class: Temporalio::Activity::Context

Inherits:
Object
  • Object
show all
Defined in:
lib/temporalio/activity/context.rb

Overview

This class provides methods that can be called from activity classes.

Instance Method Summary collapse

Constructor Details

#initialize(info, heartbeat_proc, interceptors, shielded: false) ⇒ Context

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a new instance of Context.



9
10
11
12
13
14
15
16
17
# File 'lib/temporalio/activity/context.rb', line 9

def initialize(info, heartbeat_proc, interceptors, shielded: false)
  @thread = Thread.current
  @info = info
  @heartbeat_proc = heartbeat_proc
  @interceptors = interceptors
  @pending_cancellation = nil
  @shielded = shielded
  @mutex = Mutex.new
end

Instance Method Details

#cancel(reason, by_request: false) ⇒ Object

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Cancel the running activity by raising a provided error.

Parameters:

  • reason (String)

    Reason for cancellation.

  • by_request (Boolean) (defaults to: false)

    Cancellation requested by the server or not.



83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/temporalio/activity/context.rb', line 83

def cancel(reason, by_request: false)
  error = Temporalio::Error::ActivityCancelled.new(reason, by_request)
  @pending_cancellation = error

  locked = mutex.try_lock
  # @shielded inside the lock means the whole activity is shielded
  if locked && !@shielded
    thread.raise(error)
  end
ensure
  # only release the lock if we locked it
  mutex.unlock if locked
end

#cancelled?Boolean

Whether a cancellation was ever requested on this activity.

Returns:

  • (Boolean)

    true if the activity has had a cancellation request, false otherwise.



73
74
75
# File 'lib/temporalio/activity/context.rb', line 73

def cancelled?
  !!@pending_cancellation
end

#heartbeat(*details) ⇒ Object

Send a heartbeat for the current activity.

Parameters:

  • details (Array<any>)

    Data to store with the heartbeat.



29
30
31
32
33
# File 'lib/temporalio/activity/context.rb', line 29

def heartbeat(*details)
  interceptors.invoke(:heartbeat, *details) do |*d|
    @heartbeat_proc.call(*d)
  end
end

#infoTemporalio::Activity::Info

Information about the running activity.



22
23
24
# File 'lib/temporalio/activity/context.rb', line 22

def info
  interceptors.invoke(:activity_info) { @info }
end

#shield { ... } ⇒ Object

Protect a part of activity’s implementation from cancellations.

Activity cancellations are implemented using the ‘Thread#raise`, which can unsafely terminate your implementation. To disable this behaviour make sure to wrap critical parts of your business logic in this method.

For shielding a whole activity consider using Temporalio::Activity.shielded!.

A cancellation that got requested while in a shielded block will not interrupt the execution and will raise a Error::CancelledError right after the block has finished.

Yields:

  • Block to be protected from cancellations.



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# File 'lib/temporalio/activity/context.rb', line 47

def shield(&block)
  # The whole activity is shielded, called from a nested shield
  #   or while handling a CancelledError (in a rescue or ensure blocks)
  return block.call if @shielded || cancelled?

  if Thread.current != thread
    # TODO: Use logger instead when implemented
    warn "Activity shielding is not intended to be used outside of activity's thread."
    return block.call
  end

  mutex.synchronize do
    @shielded = true
    result = block.call
    # RBS: StandardError fallback is only needed to satisfy steep - https://github.com/soutaro/steep/issues/477
    raise @pending_cancellation || StandardError if cancelled?

    result
  ensure # runs while still holding the lock
    @shielded = false
  end
end