Class: Bricolage::StreamingLoad::ObjectBuffer

Inherits:
Object
  • Object
show all
Includes:
SQLUtils
Defined in:
lib/bricolage/streamingload/objectbuffer.rb

Constant Summary collapse

TASK_GENERATION_TIME_LIMIT =

sec

30

Instance Method Summary collapse

Constructor Details

#initialize(control_data_source:, logger:) ⇒ ObjectBuffer

Returns a new instance of ObjectBuffer.



46
47
48
49
50
# File 'lib/bricolage/streamingload/objectbuffer.rb', line 46

def initialize(control_data_source:, logger:)
  @ctl_ds = control_data_source
  @logger = logger
  @task_generation_time_limit = TASK_GENERATION_TIME_LIMIT
end

Instance Method Details

#flush_table_force(table_name) ⇒ Object

Flushes the all objects of the specified table immediately with no additional conditions, to create “table checkpoint”.



105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/bricolage/streamingload/objectbuffer.rb', line 105

def flush_table_force(table_name)
  task_ids  = []
  @ctl_ds.open {|conn|
    conn.transaction {|txn|
      # update_task_object_mappings may not consume all saved objects
      # (e.g. there are too many objects for one table), we must create
      # tasks repeatedly until there are no unassigned objects.
      until (ids = insert_table_task_force(conn, table_name)).empty?
        update_task_object_mappings(conn, ids)
        log_mapped_object_num(conn, ids)
        task_ids.concat ids
      end
    }
  }
  return task_ids.map {|id| LoadTask.create(task_id: id) }
end

#flush_tasksObject

Flushes multiple tables periodically



68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/bricolage/streamingload/objectbuffer.rb', line 68

def flush_tasks
  task_ids = []
  warn_slow_task_generation {
    @ctl_ds.open {|conn|
      conn.transaction {|txn|
        task_ids = insert_tasks(conn)
        unless task_ids.empty?
          update_task_object_mappings(conn, task_ids)
          log_mapped_object_num(conn, task_ids)
        end
      }
    }
  }
  return task_ids.map {|id| LoadTask.create(task_id: id) }
end

#flush_tasks_forceObject

Flushes all objects of all tables immediately with no additional conditions, to create “stream checkpoint”.



86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/bricolage/streamingload/objectbuffer.rb', line 86

def flush_tasks_force
  task_ids  = []
  @ctl_ds.open {|conn|
    conn.transaction {|txn|
      # update_task_object_mappings may not consume all saved objects
      # (e.g. there are too many objects for one table), we must create
      # tasks repeatedly until there are no unassigned objects.
      until (ids = insert_tasks_force(conn)).empty?
        update_task_object_mappings(conn, ids)
        log_mapped_object_num(conn, ids)
        task_ids.concat ids
      end
    }
  }
  return task_ids.map {|id| LoadTask.create(task_id: id) }
end

#put(obj) ⇒ Object



52
53
54
55
56
57
58
59
60
61
62
63
64
65
# File 'lib/bricolage/streamingload/objectbuffer.rb', line 52

def put(obj)
  @ctl_ds.open {|conn|
    suppress_sql_logging {
      conn.transaction {
        object_id = insert_object(conn, obj)
        if object_id
          insert_task_objects(conn, object_id)
        else
          insert_dup_object(conn, obj)
        end
      }
    }
  }
end