Class: Bricolage::StreamingLoad::ObjectBuffer
- Inherits:
-
Object
- Object
- Bricolage::StreamingLoad::ObjectBuffer
- Includes:
- SQLUtils
- Defined in:
- lib/bricolage/streamingload/objectbuffer.rb
Constant Summary collapse
- TASK_GENERATION_TIME_LIMIT =
sec
30
Instance Method Summary collapse
-
#flush_table_force(table_name) ⇒ Object
Flushes the all objects of the specified table immediately with no additional conditions, to create “table checkpoint”.
-
#flush_tasks ⇒ Object
Flushes multiple tables periodically.
-
#flush_tasks_force ⇒ Object
Flushes all objects of all tables immediately with no additional conditions, to create “stream checkpoint”.
-
#initialize(control_data_source:, logger:) ⇒ ObjectBuffer
constructor
A new instance of ObjectBuffer.
- #put(obj) ⇒ Object
Constructor Details
#initialize(control_data_source:, logger:) ⇒ ObjectBuffer
Returns a new instance of ObjectBuffer.
46 47 48 49 50 |
# File 'lib/bricolage/streamingload/objectbuffer.rb', line 46 def initialize(control_data_source:, logger:) @ctl_ds = control_data_source @logger = logger @task_generation_time_limit = TASK_GENERATION_TIME_LIMIT end |
Instance Method Details
#flush_table_force(table_name) ⇒ Object
Flushes the all objects of the specified table immediately with no additional conditions, to create “table checkpoint”.
105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# File 'lib/bricolage/streamingload/objectbuffer.rb', line 105 def flush_table_force(table_name) task_ids = [] @ctl_ds.open {|conn| conn.transaction {|txn| # update_task_object_mappings may not consume all saved objects # (e.g. there are too many objects for one table), we must create # tasks repeatedly until there are no unassigned objects. until (ids = insert_table_task_force(conn, table_name)).empty? update_task_object_mappings(conn, ids) log_mapped_object_num(conn, ids) task_ids.concat ids end } } return task_ids.map {|id| LoadTask.create(task_id: id) } end |
#flush_tasks ⇒ Object
Flushes multiple tables periodically
68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/bricolage/streamingload/objectbuffer.rb', line 68 def flush_tasks task_ids = [] warn_slow_task_generation { @ctl_ds.open {|conn| conn.transaction {|txn| task_ids = insert_tasks(conn) unless task_ids.empty? update_task_object_mappings(conn, task_ids) log_mapped_object_num(conn, task_ids) end } } } return task_ids.map {|id| LoadTask.create(task_id: id) } end |
#flush_tasks_force ⇒ Object
Flushes all objects of all tables immediately with no additional conditions, to create “stream checkpoint”.
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
# File 'lib/bricolage/streamingload/objectbuffer.rb', line 86 def flush_tasks_force task_ids = [] @ctl_ds.open {|conn| conn.transaction {|txn| # update_task_object_mappings may not consume all saved objects # (e.g. there are too many objects for one table), we must create # tasks repeatedly until there are no unassigned objects. until (ids = insert_tasks_force(conn)).empty? update_task_object_mappings(conn, ids) log_mapped_object_num(conn, ids) task_ids.concat ids end } } return task_ids.map {|id| LoadTask.create(task_id: id) } end |
#put(obj) ⇒ Object
52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/bricolage/streamingload/objectbuffer.rb', line 52 def put(obj) @ctl_ds.open {|conn| suppress_sql_logging { conn.transaction { object_id = insert_object(conn, obj) if object_id insert_task_objects(conn, object_id) else insert_dup_object(conn, obj) end } } } end |