Class: PgVersions::Connection::Subscription
- Inherits:
-
Object
- Object
- PgVersions::Connection::Subscription
- Defined in:
- lib/pg_versions/pg_versions.rb
Instance Method Summary collapse
- #bump(*channels, notify: false) ⇒ Object
- #drop ⇒ Object
- #each(new_already_known_versions = {}) ⇒ Object
-
#initialize(connection_thread) ⇒ Subscription
constructor
A new instance of Subscription.
- #notify(versions) ⇒ Object
- #read(*channels, notify: false) ⇒ Object
- #subscribe(channels, known: {}) ⇒ Object
- #unsubscribe(*channels) ⇒ Object
- #update_already_known_versions(new_already_known_versions) ⇒ Object
-
#wait(new_already_known_versions = {}) ⇒ Object
TODO: make this resume-able after forced exception.
Constructor Details
#initialize(connection_thread) ⇒ Subscription
Returns a new instance of Subscription.
329 330 331 332 333 334 |
# File 'lib/pg_versions/pg_versions.rb', line 329 def initialize(connection_thread) @connection_thread = connection_thread @notifications = Queue.new @already_known_versions = Hash.new { |h,k| h[k] = [] } @channels = Hash.new(0) end |
Instance Method Details
#bump(*channels, notify: false) ⇒ Object
369 370 371 372 373 374 |
# File 'lib/pg_versions/pg_versions.rb', line 369 def bump(*channels, notify: false) channels = @channels.keys if channels.size == 0 versions = @connection_thread.request(:bump, channels) update_already_known_versions(versions) if not notify versions end |
#drop ⇒ Object
409 410 411 412 413 |
# File 'lib/pg_versions/pg_versions.rb', line 409 def drop @notifications << nil @connection_thread.request_nonblock(:unsubscribe, self, @channels.keys) #TODO: what to do if this object gets used after drop? end |
#each(new_already_known_versions = {}) ⇒ Object
396 397 398 399 400 401 |
# File 'lib/pg_versions/pg_versions.rb', line 396 def each(new_already_known_versions = {}) update_already_known_versions(new_already_known_versions) while notification = wait() yield notification end end |
#notify(versions) ⇒ Object
404 405 406 |
# File 'lib/pg_versions/pg_versions.rb', line 404 def notify(versions) @notifications << versions end |
#read(*channels, notify: false) ⇒ Object
361 362 363 364 365 366 |
# File 'lib/pg_versions/pg_versions.rb', line 361 def read(*channels, notify: false) channels = @channels.keys if channels.size == 0 versions = @connection_thread.request(:read, channels) update_already_known_versions(versions) if not notify versions end |
#subscribe(channels, known: {}) ⇒ Object
337 338 339 340 341 342 343 344 345 346 |
# File 'lib/pg_versions/pg_versions.rb', line 337 def subscribe(channels, known: {}) update_already_known_versions(known) channels = [channels].flatten channels.select! { |channel| (@channels[channel] += 1) == 1 } if channels.size > 0 @connection_thread.request(:subscribe, self, channels) end end |
#unsubscribe(*channels) ⇒ Object
349 350 351 352 353 354 355 356 357 358 |
# File 'lib/pg_versions/pg_versions.rb', line 349 def unsubscribe(*channels) channels = [channels].flatten channels.select! { |channel| @channels[channel] -= 1 raise "Trying to unsubscribe from channel (%p) more times than it was subscribed to"%[channel] if @channels[channel] < 0 @channels.delete(channel) if @channels[channel] == 0 not @channels.has_key?(channel) } @connection_thread.request(:unsubscribe, self, channels) end |
#update_already_known_versions(new_already_known_versions) ⇒ Object
416 417 418 419 420 |
# File 'lib/pg_versions/pg_versions.rb', line 416 def update_already_known_versions(new_already_known_versions) new_already_known_versions.each { |channel, version| @already_known_versions[channel] = version if (version <=> @already_known_versions[channel]) == 1 } end |
#wait(new_already_known_versions = {}) ⇒ Object
TODO: make this resume-able after forced exception
378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 |
# File 'lib/pg_versions/pg_versions.rb', line 378 def wait(new_already_known_versions = {}) update_already_known_versions(new_already_known_versions) loop { versions = @notifications.shift return nil if not versions #termination changed_versions = versions.to_a.map { |channel, version| if (@already_known_versions[channel] <=> version) == -1 @already_known_versions[channel] = version [channel, version] end }.compact.to_h if changed_versions.size > 0 return Notification.new(changed_versions, @already_known_versions.dup) end } end |