70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
# File 'lib/masamune/transform/postgres/stage_fact.rb', line 70
def join_conditions(source)
join_columns = shared_columns(source).values.flatten
join_columns = join_columns.select(&:reference)
join_columns = join_columns.group_by(&:reference)
dependencies = Masamune::TopologicalHash.new
conditions = Hash.new { |h, k| h[k] = OpenStruct.new(type: 'INNER', conditions: []) }
join_columns.each do |reference, columns|
reference_name = join_alias(reference)
columns.each do |column|
next if column.degenerate?
dependencies[reference_name] ||= []
cross_references = cross_references(column)
coalesce_values = []
if cross_references.any?
dependencies[reference_name] += cross_references.map { |cross_reference, _| join_alias(cross_reference) }
coalesce_values << cross_references.map { |cross_reference, cross_column| cross_column.qualified_name(cross_reference.label) }
end
if column.reference
column.reference.auto_surrogate_keys.each do |auto_surrogate_key|
next unless auto_surrogate_key.default
conditions[reference_name].conditions << "#{auto_surrogate_key.qualified_name(reference.label)} = #{auto_surrogate_key.default}"
end
end
if column.reference && !column.reference.default.nil? && column.adjacent.natural_key
coalesce_values << column.reference.default(column.adjacent)
elsif column.adjacent && !column.adjacent.default.nil?
coalesce_values << column.adjacent.sql_value(column.adjacent.default)
end
conditions[reference_name].conditions <<
if coalesce_values.any?
"#{column.foreign_key_name} = COALESCE(#{column.qualified_name}, #{coalesce_values.join(', ')})"
else
"#{column.foreign_key_name} = #{column.qualified_name}"
end
end
if reference.type == :two || reference.type == :four
join_key_a = "TO_TIMESTAMP(#{source.time_key.qualified_name}) BETWEEN #{reference.start_key.qualified_name(reference.label)} AND COALESCE(#{reference.end_key.qualified_name(reference.label)}, 'INFINITY')"
join_key_b = "TO_TIMESTAMP(#{source.time_key.qualified_name}) < #{reference.start_key.qualified_name(reference.label)} AND #{reference.version_key.qualified_name(reference.label)} = 1"
conditions[reference_name].conditions << "((#{join_key_a}) OR (#{join_key_b}))"
end
conditions[reference_name].type = 'LEFT' if reference.unknown
conditions[reference_name].conditions.uniq!
end
conditions.slice(*dependencies.tsort)
end
|