Class: EM::PG
- Inherits:
-
Object
show all
- Includes:
- Deferrable
- Defined in:
- lib/em/pg.rb
Defined Under Namespace
Classes: BadConnectionStatusError, BadPollStatusError, BadStateError, ConnectionRefusedError, DisconnectError, Error, PGError, Query, UnexpectedStateError, Watcher
Constant Summary
collapse
- VERSION =
'0.1.2'
Class Attribute Summary collapse
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(opts) ⇒ PG
Returns a new instance of PG.
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
# File 'lib/em/pg.rb', line 69
def initialize(opts)
opts = opts.dup
@logger = opts.delete(:logger) || EM::PG.logger
@on_disconnect = opts.delete(:on_disconnect)
@opts = opts
@state = :connecting
@pg = ::PG::Connection.connect_start(@opts)
@queue = []
@watcher = EM.watch(@pg.socket, Watcher, self)
@watcher.notify_readable = true
check_connect
end
|
Class Attribute Details
.logger ⇒ Object
Returns the value of attribute logger.
64
65
66
|
# File 'lib/em/pg.rb', line 64
def logger
@logger
end
|
Instance Attribute Details
#conn ⇒ Object
Returns the value of attribute conn.
68
69
70
|
# File 'lib/em/pg.rb', line 68
def conn
@conn
end
|
#logger ⇒ Object
Returns the value of attribute logger.
68
69
70
|
# File 'lib/em/pg.rb', line 68
def logger
@logger
end
|
#on_disconnect ⇒ Object
Returns the value of attribute on_disconnect.
68
69
70
|
# File 'lib/em/pg.rb', line 68
def on_disconnect
@on_disconnect
end
|
#opts ⇒ Object
Returns the value of attribute opts.
68
69
70
|
# File 'lib/em/pg.rb', line 68
def opts
@opts
end
|
#pg ⇒ Object
Returns the value of attribute pg.
68
69
70
|
# File 'lib/em/pg.rb', line 68
def pg
@pg
end
|
#state ⇒ Object
Returns the value of attribute state.
68
69
70
|
# File 'lib/em/pg.rb', line 68
def state
@state
end
|
#watcher ⇒ Object
Returns the value of attribute watcher.
68
69
70
|
# File 'lib/em/pg.rb', line 68
def watcher
@watcher
end
|
Instance Method Details
#add_to_queue(query) ⇒ Object
144
145
146
|
# File 'lib/em/pg.rb', line 144
def add_to_queue(query)
@queue << query
end
|
#check_connect ⇒ Object
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
# File 'lib/em/pg.rb', line 103
def check_connect
status = @pg.connect_poll
case status
when ::PG::PGRES_POLLING_OK
if pg.status == ::PG::CONNECTION_OK
connected
elsif pg.status == ::PG::CONNECTION_BAD
connection_refused
else
raise BadConnectionStatusError.new
end
when ::PG::PGRES_POLLING_READING
when ::PG::PGRES_POLLING_WRITING
@watcher.notify_writable = true
when ::PG::PGRES_POLLING_FAILED
@watcher.detach
connection_refused
else
raise BadPollStatsError.new
end
end
|
#close ⇒ Object
206
207
208
209
210
211
|
# File 'lib/em/pg.rb', line 206
def close
@state = :closed
@watcher.detach
@pg.finish
fail_queries :closed
end
|
#connected ⇒ Object
186
187
188
189
|
# File 'lib/em/pg.rb', line 186
def connected
@state = :connected
succeed :connected
end
|
#connection_refused ⇒ Object
191
192
193
194
195
|
# File 'lib/em/pg.rb', line 191
def connection_refused
@state = :connection_refused
logger.error [:connection_refused, @pg.error_message]
fail ConnectionRefusedError.new(message: @pg.error_message)
end
|
#consume_result(&clb) ⇒ Object
162
163
164
165
166
167
168
169
170
171
172
|
# File 'lib/em/pg.rb', line 162
def consume_result(&clb)
begin
@pg.consume_input if @pg.is_busy
else
clb.call @pg.get_last_result end
rescue ::PG::Error => e
clb.call PGError.new(original: e)
end
end
|
#fail_queries(exc) ⇒ Object
213
214
215
216
|
# File 'lib/em/pg.rb', line 213
def fail_queries(exc)
@current_query.fail exc if @current_query
@queue.each { |q| q.fail exc }
end
|
#handle ⇒ Object
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
# File 'lib/em/pg.rb', line 84
def handle
case @state
when :connecting
check_connect
when :waiting
consume_result do |res|
result_for_query res
end
else consume_result do |res|
if res.is_a? Exception
unbind res
else
error "Result in unexpected state #{@state}: #{res.inspect}"
end
end
end
end
|
#make_query(m, *args) ⇒ Object
131
132
133
134
135
136
137
138
139
140
141
142
|
# File 'lib/em/pg.rb', line 131
def make_query(m, *args)
q = Query.new m, args
case @state
when :waiting
add_to_queue q
when :connected
run_query! q
else
q.fail BadStateError.new(state: @state)
end
q
end
|
#result_for_query(res) ⇒ Object
174
175
176
177
178
179
180
181
182
183
184
|
# File 'lib/em/pg.rb', line 174
def result_for_query(res)
@state = :connected
q = @current_query
@current_query = nil
if res.is_a? Exception
q.fail res
else
q.succeed res
end
try_next_from_queue
end
|
#run_query!(q) ⇒ Object
148
149
150
151
152
153
|
# File 'lib/em/pg.rb', line 148
def run_query!(q)
@current_query = q
@state = :waiting
debug(["EM::PG", q.method, q.args])
@pg.send(q.method, *q.args)
end
|
#try_next_from_queue ⇒ Object
155
156
157
158
159
160
|
# File 'lib/em/pg.rb', line 155
def try_next_from_queue
q = @queue.shift
if q
run_query! q
end
end
|
#unbind(reason = nil) ⇒ Object
197
198
199
200
201
202
203
204
|
# File 'lib/em/pg.rb', line 197
def unbind(reason = nil)
return if @state == :disconnected
logger.error [:disconnected, reason]
@state = :disconnected
@watcher.detach
@on_disconnect.call if @on_disconnect
fail_queries DisconnectError.new
end
|