Module: Grumlin::Repository::InstanceMethods

Extended by:
Forwardable
Includes:
Expressions
Defined in:
lib/grumlin/repository/instance_methods.rb

Overview

rubocop:disable Metrics/ModuleLength

Constant Summary collapse

UPSERT_RETRY_PARAMS =
{
  on: [Grumlin::AlreadyExistsError, Grumlin::ConcurrentModificationError],
  sleep_method: ->(n) { Async::Task.current.sleep(n) },
  tries: 5,
  sleep: ->(n) { (n**2) + 1 + rand }
}.freeze
DEFAULT_ERROR_HANDLING_STRATEGY =
Grumlin::Repository::ErrorHandlingStrategy.new(mode: :retry, **UPSERT_RETRY_PARAMS)

Instance Method Summary collapse

Instance Method Details

#add_edge(label, id = nil, from:, to:, start: g, **properties) ⇒ Object



68
69
70
71
72
73
74
# File 'lib/grumlin/repository/instance_methods.rb', line 68

def add_edge(label, id = nil, from:, to:, start: g, **properties)
  id ||= properties[T.id]
  properties = properties.except(T.label)
  properties[T.id] = id

  start.addE(label).from(__.V(from)).to(__.V(to)).props(**properties).next
end

#add_vertex(label, id = nil, start: g, **properties) ⇒ Object



59
60
61
62
63
64
65
66
# File 'lib/grumlin/repository/instance_methods.rb', line 59

def add_vertex(label, id = nil, start: g, **properties)
  id ||= properties[T.id]
  properties = properties.except(T.id)

  t = start.addV(label)
  t = t.props(T.id => id) unless id.nil?
  t.props(**properties).next
end

#drop_edge(id = nil, from: nil, to: nil, label: nil, start: g) ⇒ Object

rubocop:disable Metrics/AbcSize

Raises:

  • (ArgumentError)


50
51
52
53
54
55
56
57
# File 'lib/grumlin/repository/instance_methods.rb', line 50

def drop_edge(id = nil, from: nil, to: nil, label: nil, start: g) # rubocop:disable Metrics/AbcSize
  raise ArgumentError, "either id or from:, to: and label: must be passed" if [id, from, to, label].all?(&:nil?)
  return start.E(id).drop.iterate unless id.nil?

  raise ArgumentError, "from:, to: and label: must be passed" if [from, to, label].any?(&:nil?)

  start.V(from).outE(label).where(__.inV.hasId(to)).limit(1).drop.iterate
end

#drop_in_batches(traversal, batch_size: 10_000) ⇒ Object

rubocop:disable Metrics/AbcSize



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/grumlin/repository/instance_methods.rb', line 28

def drop_in_batches(traversal, batch_size: 10_000) # rubocop:disable Metrics/AbcSize
  total_count = traversal.count.next

  batches = (total_count / batch_size) + 1

  Console.logger.info(self) do
    "drop_in_batches: total_count: #{total_count}, batch_size: #{batch_size}, batches: #{batches}"
  end

  batches.times do |batch|
    Console.logger.info(self) { "drop_in_batches: deleting batch #{batch + 1}/#{batches}..." }
    traversal.limit(batch_size).drop.iterate
    Console.logger.info(self) { "drop_in_batches: batch #{batch + 1}/#{batches} deleted" }
  end

  return if traversal.count.next.zero?

  drop_in_batches(traversal, batch_size:)

  Console.logger.info(self) { "drop_in_batches: finished." }
end

#drop_vertex(id, start: g) ⇒ Object



24
25
26
# File 'lib/grumlin/repository/instance_methods.rb', line 24

def drop_vertex(id, start: g)
  start.V(id).drop.iterate
end

#gObject



20
21
22
# File 'lib/grumlin/repository/instance_methods.rb', line 20

def g
  shortcuts.g(middlewares: self.class.middlewares)
end

#upsert_edge(label, from:, to:, create_properties: {}, update_properties: {}, on_failure: :retry, start: g, **params) ⇒ Object

Only from and to are used to find the existing edge, if one wants to assign an id to a created edge, it must be passed as T.id in create_properties.



101
102
103
104
105
106
107
# File 'lib/grumlin/repository/instance_methods.rb', line 101

def upsert_edge(label, from:, to:, create_properties: {}, update_properties: {}, # rubocop:disable Metrics/ParameterLists
                on_failure: :retry, start: g, **params)
  with_upsert_error_handling(on_failure, params) do
    create_properties, update_properties = cleanup_properties(create_properties, update_properties, T.label)
    start.upsertE(label, from, to, create_properties, update_properties).id.next
  end
end

#upsert_edges(edges, batch_size: 100, on_failure: :retry, start: g, **params) ⇒ Object

edges:

[“label”, “from”, “to”, :properties, properties]

params can override Retryable config from UPSERT_RETRY_PARAMS



112
113
114
115
116
117
118
119
120
121
122
# File 'lib/grumlin/repository/instance_methods.rb', line 112

def upsert_edges(edges, batch_size: 100, on_failure: :retry, start: g, **params)
  edges.each_slice(batch_size) do |slice|
    with_upsert_error_handling(on_failure, params) do
      slice.reduce(start) do |t, (label, from, to, create_properties, update_properties)|
        create_properties, update_properties = cleanup_properties(create_properties, update_properties, T.label)

        t.upsertE(label, from, to, create_properties, update_properties)
      end.id.iterate
    end
  end
end

#upsert_vertex(label, id, create_properties: {}, update_properties: {}, on_failure: :retry, start: g, **params) ⇒ Object

rubocop:disable Metrics/ParameterLists



76
77
78
79
80
81
82
# File 'lib/grumlin/repository/instance_methods.rb', line 76

def upsert_vertex(label, id, create_properties: {}, update_properties: {}, on_failure: :retry, start: g, **params) # rubocop:disable Metrics/ParameterLists
  with_upsert_error_handling(on_failure, params) do
    create_properties, update_properties = cleanup_properties(create_properties, update_properties)

    start.upsertV(label, id, create_properties, update_properties).id.next
  end
end

#upsert_vertices(vertices, batch_size: 100, on_failure: :retry, start: g, **params) ⇒ Object

vertices:

[“label”, “id”, :properties, properties]

params can override Retryable config from UPSERT_RETRY_PARAMS



87
88
89
90
91
92
93
94
95
96
97
# File 'lib/grumlin/repository/instance_methods.rb', line 87

def upsert_vertices(vertices, batch_size: 100, on_failure: :retry, start: g, **params)
  vertices.each_slice(batch_size) do |slice|
    with_upsert_error_handling(on_failure, params) do
      slice.reduce(start) do |t, (label, id, create_properties, update_properties)|
        create_properties, update_properties = cleanup_properties(create_properties, update_properties)

        t.upsertV(label, id, create_properties, update_properties)
      end.id.iterate
    end
  end
end