Class: EM::PG
- Inherits:
-
Object
show all
- Includes:
- Deferrable
- Defined in:
- lib/em/pg.rb
Defined Under Namespace
Classes: BadConnectionStatusError, BadPollStatusError, BadStateError, ConnectionRefusedError, DisconnectError, Error, PGError, Query, UnexpectedStateError, Watcher
Constant Summary
collapse
- VERSION =
'0.1.0'
Class Attribute Summary collapse
Instance Attribute Summary collapse
Instance Method Summary
collapse
Constructor Details
#initialize(opts) ⇒ PG
Returns a new instance of PG.
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
# File 'lib/em/pg.rb', line 65
def initialize(opts)
opts = opts.dup
@logger = opts.delete(:logger) || EM::Postgres.logger
@on_disconnect = opts.delete(:on_disconnect)
@opts = opts
@state = :connecting
@pg = ::PG::Connection.connect_start(@opts)
@queue = []
@watcher = EM.watch(@pg.socket, Watcher, self)
@watcher.notify_readable = true
check_connect
end
|
Class Attribute Details
.logger ⇒ Object
Returns the value of attribute logger.
60
61
62
|
# File 'lib/em/pg.rb', line 60
def logger
@logger
end
|
Instance Attribute Details
#conn ⇒ Object
Returns the value of attribute conn.
64
65
66
|
# File 'lib/em/pg.rb', line 64
def conn
@conn
end
|
#logger ⇒ Object
Returns the value of attribute logger.
64
65
66
|
# File 'lib/em/pg.rb', line 64
def logger
@logger
end
|
#on_disconnect ⇒ Object
Returns the value of attribute on_disconnect.
64
65
66
|
# File 'lib/em/pg.rb', line 64
def on_disconnect
@on_disconnect
end
|
#opts ⇒ Object
Returns the value of attribute opts.
64
65
66
|
# File 'lib/em/pg.rb', line 64
def opts
@opts
end
|
#pg ⇒ Object
Returns the value of attribute pg.
64
65
66
|
# File 'lib/em/pg.rb', line 64
def pg
@pg
end
|
#state ⇒ Object
Returns the value of attribute state.
64
65
66
|
# File 'lib/em/pg.rb', line 64
def state
@state
end
|
#watcher ⇒ Object
Returns the value of attribute watcher.
64
65
66
|
# File 'lib/em/pg.rb', line 64
def watcher
@watcher
end
|
Instance Method Details
#add_to_queue(query) ⇒ Object
140
141
142
|
# File 'lib/em/pg.rb', line 140
def add_to_queue(query)
@queue << query
end
|
#check_connect ⇒ Object
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
# File 'lib/em/pg.rb', line 99
def check_connect
status = @pg.connect_poll
case status
when ::PG::PGRES_POLLING_OK
if pg.status == ::PG::CONNECTION_OK
connected
elsif pg.status == ::PG::CONNECTION_BAD
connection_refused
else
raise BadConnectionStatusError.new
end
when ::PG::PGRES_POLLING_READING
when ::PG::PGRES_POLLING_WRITING
@watcher.notify_writable = true
when ::PG::PGRES_POLLING_FAILED
@watcher.detach
connection_refused
else
raise BadPollStatsError.new
end
end
|
#close ⇒ Object
202
203
204
205
206
207
|
# File 'lib/em/pg.rb', line 202
def close
@state = :closed
@watcher.detach
@pg.finish
fail_queries :closed
end
|
#connected ⇒ Object
182
183
184
185
|
# File 'lib/em/pg.rb', line 182
def connected
@state = :connected
succeed :connected
end
|
#connection_refused ⇒ Object
187
188
189
190
191
|
# File 'lib/em/pg.rb', line 187
def connection_refused
@state = :connection_refused
logger.error [:connection_refused, @pg.error_message]
fail ConnectionRefusedError.new(message: @pg.error_message)
end
|
#consume_result(&clb) ⇒ Object
158
159
160
161
162
163
164
165
166
167
168
|
# File 'lib/em/pg.rb', line 158
def consume_result(&clb)
begin
@pg.consume_input if @pg.is_busy
else
clb.call @pg.get_last_result end
rescue ::PG::Error => e
clb.call PGError.new(original: e)
end
end
|
#fail_queries(exc) ⇒ Object
209
210
211
212
|
# File 'lib/em/pg.rb', line 209
def fail_queries(exc)
@current_query.fail exc if @current_query
@queue.each { |q| q.fail exc }
end
|
#handle ⇒ Object
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
# File 'lib/em/pg.rb', line 80
def handle
case @state
when :connecting
check_connect
when :waiting
consume_result do |res|
result_for_query res
end
else consume_result do |res|
if res.is_a? Exception
unbind res
else
error "Result in unexpected state #{@state}: #{res.inspect}"
end
end
end
end
|
#make_query(m, *args) ⇒ Object
127
128
129
130
131
132
133
134
135
136
137
138
|
# File 'lib/em/pg.rb', line 127
def make_query(m, *args)
q = Query.new m, args
case @state
when :waiting
add_to_queue q
when :connected
run_query! q
else
q.fail BadStateError.new(state: @state)
end
q
end
|
#result_for_query(res) ⇒ Object
170
171
172
173
174
175
176
177
178
179
180
|
# File 'lib/em/pg.rb', line 170
def result_for_query(res)
@state = :connected
q = @current_query
@current_query = nil
if res.is_a? Exception
q.fail res
else
q.succeed res
end
try_next_from_queue
end
|
#run_query!(q) ⇒ Object
144
145
146
147
148
149
|
# File 'lib/em/pg.rb', line 144
def run_query!(q)
@current_query = q
@state = :waiting
debug(["EM::PG", q.method, q.args])
@pg.send(q.method, *q.args)
end
|
#try_next_from_queue ⇒ Object
151
152
153
154
155
156
|
# File 'lib/em/pg.rb', line 151
def try_next_from_queue
q = @queue.shift
if q
run_query! q
end
end
|
#unbind(reason = nil) ⇒ Object
193
194
195
196
197
198
199
200
|
# File 'lib/em/pg.rb', line 193
def unbind(reason = nil)
return if @state == :disconnected
logger.error [:disconnected, reason]
@state = :disconnected
@watcher.detach
@on_disconnect.call if @on_disconnect
fail_queries DisconnectError.new
end
|