Class: WCC::Contentful::Store::PostgresStore
Overview
Implements the store interface where all Contentful entries are stored in a JSONB table.
Defined Under Namespace
Classes: Query
Constant Summary
collapse
- @@schema_mutex =
This is intentionally a class var so that all subclasses share the same mutex
Mutex.new
Constants included
from Interface
Interface::INTERFACE_METHODS
Instance Attribute Summary collapse
#_instrumentation
Class Method Summary
collapse
Instance Method Summary
collapse
#_instrumentation_event_prefix, instrument
Methods inherited from Base
#ensure_hash, #execute, #find_by, #index, #index?
Methods included from Interface
#find_by, #index, #index?
Constructor Details
#initialize(configuration = nil, connection_options = nil, pool_options = nil) ⇒ PostgresStore
Returns a new instance of PostgresStore.
20
21
22
23
24
25
26
27
28
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 20
def initialize(configuration = nil, connection_options = nil, pool_options = nil)
super(configuration)
@schema_ensured = false
connection_options ||= { dbname: 'postgres' }
pool_options ||= {}
@connection_pool = PostgresStore.build_connection_pool(connection_options, pool_options)
@dirty = Concurrent::AtomicBoolean.new
@mutex = Mutex.new
end
|
Instance Attribute Details
#connection_pool ⇒ Object
17
18
19
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 17
def connection_pool
@connection_pool
end
|
#logger ⇒ Object
18
19
20
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 18
def logger
@logger
end
|
Class Method Details
.build_connection_pool(connection_options, pool_options) ⇒ Object
rubocop:disable Style/ClassVars
326
327
328
329
330
331
332
333
334
335
336
337
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 326
def build_connection_pool(connection_options, pool_options)
ConnectionPool.new(pool_options) do
PG.connect(connection_options).tap do |conn|
unless schema_ensured?(conn)
@@schema_mutex.synchronize do
ensure_schema(conn) unless schema_ensured?(conn)
end
end
prepare_statements(conn)
end
end
end
|
.ensure_schema(conn) ⇒ Object
350
351
352
353
354
355
356
357
358
359
360
361
362
363
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 350
def ensure_schema(conn)
result =
begin
conn.exec('SELECT version FROM wcc_contentful_schema_version ' \
'ORDER BY version DESC')
rescue PG::UndefinedTable
[]
end
1.upto(EXPECTED_VERSION).each do |version_num|
next if result.find { |row| row['version'].to_s == version_num.to_s }
conn.exec(File.read(File.join(__dir__, "postgres_store/schema_#{version_num}.sql")))
end
end
|
.prepare_statements(conn) ⇒ Object
314
315
316
317
318
319
320
321
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 314
def prepare_statements(conn)
conn.prepare('upsert_entry', 'SELECT * FROM fn_contentful_upsert_entry($1,$2,$3)')
conn.prepare('select_entry', 'SELECT * FROM contentful_raw WHERE id = $1')
conn.prepare('select_ids', 'SELECT id FROM contentful_raw')
conn.prepare('delete_by_id', 'DELETE FROM contentful_raw WHERE id = $1 RETURNING *')
conn.prepare('refresh_views_concurrently',
'REFRESH MATERIALIZED VIEW CONCURRENTLY contentful_raw_includes_ids_jointable')
end
|
.schema_ensured?(conn) ⇒ Boolean
339
340
341
342
343
344
345
346
347
348
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 339
def schema_ensured?(conn)
result = conn.exec('SELECT version FROM wcc_contentful_schema_version ' \
'ORDER BY version DESC LIMIT 1')
return false if result.num_tuples == 0
result[0]['version'].to_i >= EXPECTED_VERSION
rescue PG::UndefinedTable
false
end
|
Instance Method Details
#delete(key) ⇒ Object
76
77
78
79
80
81
82
83
84
85
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 76
def delete(key)
result =
_instrument 'delete_by_id', key: key do
@connection_pool.with { |conn| conn.exec_prepared('delete_by_id', [key]) }
end
return if result.num_tuples == 0
JSON.parse(result.getvalue(0, 1))
end
|
#exec_query(statement, params = []) ⇒ Object
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 108
def exec_query(statement, params = [])
if @dirty.true?
@mutex.synchronize do
if @dirty.true?
_instrument 'refresh_views' do
@connection_pool.with { |conn| conn.exec_prepared('refresh_views_concurrently') }
end
@dirty.make_false
end
end
end
logger&.debug("[PostgresStore] #{statement} #{params.inspect}")
_instrument 'exec' do
@connection_pool.with { |conn| conn.exec(statement, params) }
end
end
|
#find(key, **_options) ⇒ Object
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 87
def find(key, **_options)
result =
_instrument 'select_entry', key: key do
@connection_pool.with { |conn| conn.exec_prepared('select_entry', [key]) }
end
return if result.num_tuples == 0
JSON.parse(result.getvalue(0, 1))
rescue PG::ConnectionBad
nil
end
|
#find_all(content_type:, options: nil) ⇒ Object
99
100
101
102
103
104
105
106
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 99
def find_all(content_type:, options: nil)
Query.new(
self,
content_type: content_type,
options: options,
configuration: @configuration
)
end
|
#keys ⇒ Object
63
64
65
66
67
68
69
70
71
72
73
74
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 63
def keys
result =
_instrument 'select_ids' do
@connection_pool.with { |conn| conn.exec_prepared('select_ids') }
end
arr = []
result.each { |r| arr << r['id'].strip }
arr
rescue PG::ConnectionBad
[]
end
|
#set(key, value) ⇒ Object
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
# File 'lib/wcc/contentful/store/postgres_store.rb', line 30
def set(key, value)
ensure_hash value
result =
_instrument 'upsert_entry' do
@connection_pool.with do |conn|
conn.exec_prepared('upsert_entry', [
key,
value.to_json,
quote_array((value))
])
end
end
previous_value =
if result.num_tuples == 0
nil
else
val = result.getvalue(0, 0)
JSON.parse(val) if val
end
if views_need_update?(value, previous_value)
was_dirty = @dirty.make_true
_instrument 'mark_dirty' if was_dirty
end
previous_value
end
|