Class: Messaging::Adapters::Postgres::CategoryWithPartitions

Inherits:
Category
  • Object
show all
Defined in:
lib/messaging/adapters/postgres/category_with_partitions.rb

Instance Method Summary collapse

Methods inherited from Category

#messages, table_name_for

Instance Method Details

#add_partition(date:) ⇒ Object



5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# File 'lib/messaging/adapters/postgres/category_with_partitions.rb', line 5

def add_partition(date:)
  from, to = partition_range_for(date: date)
  partition_name = partition_name_for(date: from)

  return if partition_exists?(partition_name)

  sql = <<~SQL
    CREATE TABLE IF NOT EXISTS messaging.#{partition_name}
    PARTITION OF messaging.#{table_name} FOR VALUES FROM ('#{from}') TO ('#{to}')
  SQL

  SerializedMessage.transaction do
    AdvisoryTransactionLock.call key: partition_name
    SerializedMessage.connection.execute sql
  end
end

#add_partitions(start_date:, days: 1) ⇒ Object

Creates multiple partitions

Parameters:

  • start_date (Date)

    from which date to create partitions

  • days (Integer) (defaults to: 1)

    how many days worth of partitions to create



26
27
28
29
30
31
32
33
# File 'lib/messaging/adapters/postgres/category_with_partitions.rb', line 26

def add_partitions(start_date:, days: 1)
  first = start_date.to_date
  last = first + (days - 1)

  (first..last).each do |date|
    add_partition(date: date)
  end
end

#drop_partition(partition_name) ⇒ Object

Removes a partition including the included messages

Parameters:

  • partition_name (String)

    the name of the partition to drop



38
39
40
41
42
# File 'lib/messaging/adapters/postgres/category_with_partitions.rb', line 38

def drop_partition(partition_name)
  return unless partition_name.match?(/^#{table_name}_\d{4}_\d{2}_\d{2}$/)

  SerializedMessage.connection.execute "drop TABLE messaging.#{partition_name}"
end

#drop_partitions_older_than(date) ⇒ Object

Removes all partitions older than the given date

Parameters:

  • date (Date)

    the cutoff date



47
48
49
50
51
52
# File 'lib/messaging/adapters/postgres/category_with_partitions.rb', line 47

def drop_partitions_older_than(date)
  max_partition_name = partition_name_for(date: date)
  partitions.select { |p| p < max_partition_name }.each do |p|
    drop_partition(p)
  end
end

#inspectObject



83
84
85
# File 'lib/messaging/adapters/postgres/category_with_partitions.rb', line 83

def inspect
  "#<CategoryWithPartitions: #{name}>"
end

#partition_exists?(partition_name) ⇒ Boolean

Returns:

  • (Boolean)


64
65
66
# File 'lib/messaging/adapters/postgres/category_with_partitions.rb', line 64

def partition_exists?(partition_name)
  partitions.include? partition_name
end

#partition_name_for(date:) ⇒ Object



54
55
56
# File 'lib/messaging/adapters/postgres/category_with_partitions.rb', line 54

def partition_name_for(date:)
  "#{name}_%d_%02d_%02d" % [date.year, date.month, date.day]
end

#partition_range_for(date:) ⇒ Object



58
59
60
61
62
# File 'lib/messaging/adapters/postgres/category_with_partitions.rb', line 58

def partition_range_for(date:)
  from = date.to_time.beginning_of_day
  to = from + 1.day
  [from, to]
end

#partitionsObject



68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/messaging/adapters/postgres/category_with_partitions.rb', line 68

def partitions
  sql = <<~SQL
    SELECT child.relname AS name
    FROM pg_inherits
    JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
    JOIN pg_class child  ON pg_inherits.inhrelid   = child.oid
    JOIN pg_namespace ON pg_namespace.oid  = parent.relnamespace
    WHERE pg_namespace.nspname = 'messaging'
    AND parent.relname = '#{table_name}'
    AND child.relkind = 'r'
    ORDER BY name
  SQL
  SerializedMessage.connection.select_values(sql)
end