Class: ActiveRecord::FutureResult
- Inherits:
-
Object
- Object
- ActiveRecord::FutureResult
show all
- Defined in:
- lib/active_record/future_result.rb
Overview
Defined Under Namespace
Classes: Complete, EventBuffer, SelectAll
Constant Summary
collapse
- Canceled =
Class.new(ActiveRecordError)
Instance Attribute Summary collapse
Class Method Summary
collapse
Instance Method Summary
collapse
Constructor Details
#initialize(pool, *args, **kwargs) ⇒ FutureResult
Returns a new instance of FutureResult.
66
67
68
69
70
71
72
73
74
75
76
77
78
79
|
# File 'lib/active_record/future_result.rb', line 66
def initialize(pool, *args, **kwargs)
@mutex = Mutex.new
@session = nil
@pool = pool
@args = args
@kwargs = kwargs
@pending = true
@error = nil
@result = nil
@instrumenter = ActiveSupport::Notifications.instrumenter
@event_buffer = nil
end
|
Instance Attribute Details
#lock_wait ⇒ Object
Returns the value of attribute lock_wait.
64
65
66
|
# File 'lib/active_record/future_result.rb', line 64
def lock_wait
@lock_wait
end
|
Class Method Details
.wrap(result) ⇒ Object
53
54
55
56
57
58
59
60
|
# File 'lib/active_record/future_result.rb', line 53
def self.wrap(result)
case result
when self, Complete
result
else
Complete.new(result)
end
end
|
Instance Method Details
#cancel ⇒ Object
94
95
96
97
98
|
# File 'lib/active_record/future_result.rb', line 94
def cancel
@pending = false
@error = Canceled
self
end
|
#canceled? ⇒ Boolean
139
140
141
|
# File 'lib/active_record/future_result.rb', line 139
def canceled?
@session && !@session.active?
end
|
#execute!(connection) ⇒ Object
90
91
92
|
# File 'lib/active_record/future_result.rb', line 90
def execute!(connection)
execute_query(connection)
end
|
#execute_or_skip ⇒ Object
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
# File 'lib/active_record/future_result.rb', line 100
def execute_or_skip
return unless pending?
@session.synchronize do
return unless pending?
@pool.with_connection do |connection|
return unless @mutex.try_lock
begin
if pending?
@event_buffer = EventBuffer.new(self, @instrumenter)
connection.with_instrumenter(@event_buffer) do
execute_query(connection, async: true)
end
end
ensure
@mutex.unlock
end
end
end
end
|
#pending? ⇒ Boolean
135
136
137
|
# File 'lib/active_record/future_result.rb', line 135
def pending?
@pending && (!@session || @session.active?)
end
|
#result ⇒ Object
122
123
124
125
126
127
128
129
130
131
132
133
|
# File 'lib/active_record/future_result.rb', line 122
def result
execute_or_wait
@event_buffer&.flush
if canceled?
raise Canceled
elsif @error
raise @error
else
@result
end
end
|
#schedule!(session) ⇒ Object
85
86
87
88
|
# File 'lib/active_record/future_result.rb', line 85
def schedule!(session)
@session = session
@pool.schedule_query(self)
end
|
#then(&block) ⇒ Object
81
82
83
|
# File 'lib/active_record/future_result.rb', line 81
def then(&block)
Promise.new(self, block)
end
|