Class: GraphAgent::Checkpoint::InMemorySaver

Inherits:
BaseSaver
  • Object
show all
Defined in:
lib/graph_agent/checkpoint/in_memory_saver.rb

Instance Method Summary collapse

Methods inherited from BaseSaver

#get

Constructor Details

#initializeInMemorySaver

Returns a new instance of InMemorySaver.



8
9
10
11
12
13
# File 'lib/graph_agent/checkpoint/in_memory_saver.rb', line 8

def initialize
  super
  @storage = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = {} } }
  @writes = Hash.new { |h, k| h[k] = {} }
  @order = Hash.new { |h, k| h[k] = Hash.new { |h2, k2| h2[k2] = [] } }
end

Instance Method Details

#delete_thread(thread_id) ⇒ Object



89
90
91
92
93
# File 'lib/graph_agent/checkpoint/in_memory_saver.rb', line 89

def delete_thread(thread_id)
  @storage.delete(thread_id)
  @order.delete(thread_id)
  @writes.delete_if { |key, _| key.first == thread_id }
end

#get_tuple(config) ⇒ Object



15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/graph_agent/checkpoint/in_memory_saver.rb', line 15

def get_tuple(config)
  thread_id = config.dig(:configurable, :thread_id)
  checkpoint_ns = config.dig(:configurable, :checkpoint_ns) || ""
  checkpoint_id = config.dig(:configurable, :checkpoint_id)

  if checkpoint_id
    saved = @storage[thread_id][checkpoint_ns][checkpoint_id]
    return nil unless saved

    build_tuple(thread_id, checkpoint_ns, checkpoint_id, saved, config)
  else
    checkpoints = @storage[thread_id][checkpoint_ns]
    return nil if checkpoints.empty?

    checkpoint_id = @order[thread_id][checkpoint_ns].last
    return nil unless checkpoint_id

    saved = checkpoints[checkpoint_id]
    result_config = {
      configurable: {
        thread_id: thread_id,
        checkpoint_ns: checkpoint_ns,
        checkpoint_id: checkpoint_id
      }
    }
    build_tuple(thread_id, checkpoint_ns, checkpoint_id, saved, result_config)
  end
end

#list(config, filter: nil, before: nil, limit: nil) ⇒ Object



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/graph_agent/checkpoint/in_memory_saver.rb', line 44

def list(config, filter: nil, before: nil, limit: nil)
  thread_ids = config ? [config.dig(:configurable, :thread_id)] : @storage.keys
  results = []

  thread_ids.each do |thread_id|
    @storage[thread_id].each do |checkpoint_ns, checkpoints|
      _list_ns(thread_id, checkpoint_ns, checkpoints, results, filter: filter, before: before, limit: limit)
    end
  end

  results
end

#put(config, checkpoint, metadata, new_versions) ⇒ Object



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/graph_agent/checkpoint/in_memory_saver.rb', line 57

def put(config, checkpoint, , new_versions)
  thread_id = config.dig(:configurable, :thread_id)
  checkpoint_ns = config.dig(:configurable, :checkpoint_ns) || ""
  checkpoint_id = checkpoint[:id] || SecureRandom.uuid

  @storage[thread_id][checkpoint_ns][checkpoint_id] = {
    checkpoint: checkpoint,
    metadata: ,
    parent_checkpoint_id: config.dig(:configurable, :checkpoint_id)
  }
  @order[thread_id][checkpoint_ns] << checkpoint_id

  {
    configurable: {
      thread_id: thread_id,
      checkpoint_ns: checkpoint_ns,
      checkpoint_id: checkpoint_id
    }
  }
end

#put_writes(config, writes, task_id) ⇒ Object



78
79
80
81
82
83
84
85
86
87
# File 'lib/graph_agent/checkpoint/in_memory_saver.rb', line 78

def put_writes(config, writes, task_id)
  thread_id = config.dig(:configurable, :thread_id)
  checkpoint_ns = config.dig(:configurable, :checkpoint_ns) || ""
  checkpoint_id = config.dig(:configurable, :checkpoint_id)
  key = [thread_id, checkpoint_ns, checkpoint_id]

  writes.each_with_index do |(channel, value), idx|
    @writes[key][[task_id, idx]] = { task_id: task_id, channel: channel, value: value }
  end
end