Class: EM::PG

Inherits:
Object
  • Object
show all
Includes:
Deferrable
Defined in:
lib/em/pg.rb

Defined Under Namespace

Classes: BadConnectionStatusError, BadPollStatusError, BadStateError, ConnectionRefusedError, DisconnectError, Error, PGError, Query, UnexpectedStateError, Watcher

Constant Summary collapse

VERSION =
'0.1.2'

Class Attribute Summary collapse

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(opts) ⇒ PG

Returns a new instance of PG.



69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/em/pg.rb', line 69

def initialize(opts)
  opts = opts.dup
  @logger = opts.delete(:logger) || EM::PG.logger
  @on_disconnect = opts.delete(:on_disconnect)
  @opts = opts
  @state = :connecting

  @pg = ::PG::Connection.connect_start(@opts)
  @queue = []

  @watcher = EM.watch(@pg.socket, Watcher, self)
  @watcher.notify_readable = true
  check_connect
end

Class Attribute Details

.loggerObject

Returns the value of attribute logger.



64
65
66
# File 'lib/em/pg.rb', line 64

def logger
  @logger
end

Instance Attribute Details

#connObject

Returns the value of attribute conn.



68
69
70
# File 'lib/em/pg.rb', line 68

def conn
  @conn
end

#loggerObject

Returns the value of attribute logger.



68
69
70
# File 'lib/em/pg.rb', line 68

def logger
  @logger
end

#on_disconnectObject

Returns the value of attribute on_disconnect.



68
69
70
# File 'lib/em/pg.rb', line 68

def on_disconnect
  @on_disconnect
end

#optsObject

Returns the value of attribute opts.



68
69
70
# File 'lib/em/pg.rb', line 68

def opts
  @opts
end

#pgObject

Returns the value of attribute pg.



68
69
70
# File 'lib/em/pg.rb', line 68

def pg
  @pg
end

#stateObject

Returns the value of attribute state.



68
69
70
# File 'lib/em/pg.rb', line 68

def state
  @state
end

#watcherObject

Returns the value of attribute watcher.



68
69
70
# File 'lib/em/pg.rb', line 68

def watcher
  @watcher
end

Instance Method Details

#add_to_queue(query) ⇒ Object



144
145
146
# File 'lib/em/pg.rb', line 144

def add_to_queue(query)
  @queue << query
end

#check_connectObject



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/em/pg.rb', line 103

def check_connect
  status = @pg.connect_poll
  case status
  when ::PG::PGRES_POLLING_OK
    if pg.status == ::PG::CONNECTION_OK
      connected
    elsif pg.status == ::PG::CONNECTION_BAD
      connection_refused
    else
      raise BadConnectionStatusError.new
    end
  when ::PG::PGRES_POLLING_READING
  when ::PG::PGRES_POLLING_WRITING
    @watcher.notify_writable = true
  when ::PG::PGRES_POLLING_FAILED
    @watcher.detach
    connection_refused
  else
    raise BadPollStatsError.new
  end
end

#closeObject



206
207
208
209
210
211
# File 'lib/em/pg.rb', line 206

def close
  @state = :closed
  @watcher.detach
  @pg.finish
  fail_queries :closed
end

#connectedObject



186
187
188
189
# File 'lib/em/pg.rb', line 186

def connected
  @state = :connected
  succeed :connected
end

#connection_refusedObject



191
192
193
194
195
# File 'lib/em/pg.rb', line 191

def connection_refused
  @state = :connection_refused
  logger.error [:connection_refused, @pg.error_message]
  fail ConnectionRefusedError.new(message: @pg.error_message)
end

#consume_result(&clb) ⇒ Object



162
163
164
165
166
167
168
169
170
171
172
# File 'lib/em/pg.rb', line 162

def consume_result(&clb)
  begin
    @pg.consume_input # can raise exceptins
    if @pg.is_busy
    else
      clb.call @pg.get_last_result # can raise exceptions
    end
  rescue ::PG::Error => e
    clb.call PGError.new(original: e)
  end
end

#fail_queries(exc) ⇒ Object



213
214
215
216
# File 'lib/em/pg.rb', line 213

def fail_queries(exc)
  @current_query.fail exc if @current_query
  @queue.each { |q| q.fail exc }
end

#handleObject



84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/em/pg.rb', line 84

def handle
  case @state
  when :connecting
    check_connect
  when :waiting
    consume_result do |res|
      result_for_query res
    end
  else # try check result, may be it close-message
    consume_result do |res|
      if res.is_a? Exception
        unbind res
      else
        error "Result in unexpected state #{@state}: #{res.inspect}"
      end
    end
  end
end

#make_query(m, *args) ⇒ Object



131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/em/pg.rb', line 131

def make_query(m, *args)
  q = Query.new m, args
  case @state
  when :waiting
    add_to_queue q
  when :connected
    run_query! q
  else
    q.fail BadStateError.new(state: @state)
  end
  q
end

#result_for_query(res) ⇒ Object



174
175
176
177
178
179
180
181
182
183
184
# File 'lib/em/pg.rb', line 174

def result_for_query(res)
  @state = :connected
  q = @current_query
  @current_query = nil
  if res.is_a? Exception
    q.fail res
  else
    q.succeed res
  end
  try_next_from_queue
end

#run_query!(q) ⇒ Object



148
149
150
151
152
153
# File 'lib/em/pg.rb', line 148

def run_query!(q)
  @current_query = q
  @state = :waiting
  debug(["EM::PG", q.method, q.args])
  @pg.send(q.method, *q.args)
end

#try_next_from_queueObject



155
156
157
158
159
160
# File 'lib/em/pg.rb', line 155

def try_next_from_queue
  q = @queue.shift
  if q
    run_query! q
  end
end

#unbind(reason = nil) ⇒ Object



197
198
199
200
201
202
203
204
# File 'lib/em/pg.rb', line 197

def unbind(reason = nil)
  return if @state == :disconnected
  logger.error [:disconnected, reason]
  @state = :disconnected
  @watcher.detach
  @on_disconnect.call if @on_disconnect
  fail_queries DisconnectError.new
end