Class: PgVersions::Connection::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_versions/pg_versions.rb

Instance Method Summary collapse

Constructor Details

#initialize(connection_thread) ⇒ Subscription

Returns a new instance of Subscription.



329
330
331
332
333
334
# File 'lib/pg_versions/pg_versions.rb', line 329

def initialize(connection_thread)
  @connection_thread = connection_thread
  @notifications = Queue.new
  @already_known_versions = Hash.new { |h,k| h[k] = [] }
  @channels = Hash.new(0)
end

Instance Method Details

#bump(*channels, notify: false) ⇒ Object



369
370
371
372
373
374
# File 'lib/pg_versions/pg_versions.rb', line 369

def bump(*channels, notify: false)
  channels = @channels.keys  if channels.size == 0
  versions = @connection_thread.request(:bump, channels)
  update_already_known_versions(versions)  if not notify
  versions
end

#dropObject



409
410
411
412
413
# File 'lib/pg_versions/pg_versions.rb', line 409

def drop
  @notifications << nil
  @connection_thread.request_nonblock(:unsubscribe, self, @channels.keys)
  #TODO: what to do if this object gets used after drop?
end

#each(new_already_known_versions = {}) ⇒ Object



396
397
398
399
400
401
# File 'lib/pg_versions/pg_versions.rb', line 396

def each(new_already_known_versions = {})
  update_already_known_versions(new_already_known_versions)
  while notification = wait()
    yield notification
  end
end

#notify(versions) ⇒ Object



404
405
406
# File 'lib/pg_versions/pg_versions.rb', line 404

def notify(versions)
  @notifications << versions
end

#read(*channels, notify: false) ⇒ Object



361
362
363
364
365
366
# File 'lib/pg_versions/pg_versions.rb', line 361

def read(*channels, notify: false)
  channels = @channels.keys  if channels.size == 0
  versions = @connection_thread.request(:read, channels)
  update_already_known_versions(versions)  if not notify
  versions
end

#subscribe(channels, known: {}) ⇒ Object



337
338
339
340
341
342
343
344
345
346
# File 'lib/pg_versions/pg_versions.rb', line 337

def subscribe(channels, known: {})
  update_already_known_versions(known)
  channels = [channels].flatten
  channels.select! { |channel|
    (@channels[channel] += 1) == 1
  }
  if channels.size > 0
    @connection_thread.request(:subscribe, self, channels)
  end
end

#unsubscribe(*channels) ⇒ Object



349
350
351
352
353
354
355
356
357
358
# File 'lib/pg_versions/pg_versions.rb', line 349

def unsubscribe(*channels)
  channels = [channels].flatten
  channels.select! { |channel|
    @channels[channel] -= 1
    raise "Trying to unsubscribe from channel (%p) more times than it was subscribed to"%[channel]  if @channels[channel] < 0
    @channels.delete(channel)  if @channels[channel] == 0
    not @channels.has_key?(channel)
  }
  @connection_thread.request(:unsubscribe, self, channels)
end

#update_already_known_versions(new_already_known_versions) ⇒ Object



416
417
418
419
420
# File 'lib/pg_versions/pg_versions.rb', line 416

def update_already_known_versions(new_already_known_versions)
  new_already_known_versions.each { |channel, version|
    @already_known_versions[channel] = version  if (version <=> @already_known_versions[channel]) == 1
  }
end

#wait(new_already_known_versions = {}) ⇒ Object

TODO: make this resume-able after forced exception



378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
# File 'lib/pg_versions/pg_versions.rb', line 378

def wait(new_already_known_versions = {})
  update_already_known_versions(new_already_known_versions)
  loop {
    versions = @notifications.shift
    return nil  if not versions #termination
    changed_versions = versions.to_a.map { |channel, version|
      if (@already_known_versions[channel] <=> version) == -1
        @already_known_versions[channel] = version
        [channel, version]
      end
    }.compact.to_h
    if changed_versions.size > 0
      return Notification.new(changed_versions, @already_known_versions.dup)
    end
  }
end