Class: PgVersions::Connection::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/pg_versions/pg_versions.rb

Instance Method Summary collapse

Constructor Details

#initialize(connection_thread, batch_delay) ⇒ Subscription

Returns a new instance of Subscription.



333
334
335
336
337
338
339
# File 'lib/pg_versions/pg_versions.rb', line 333

def initialize(connection_thread, batch_delay)
	@connection_thread = connection_thread
	@batch_delay = batch_delay
	@notifications = Queue.new
	@already_known_versions = Hash.new { |h,k| h[k] = [] }
	@channels = Hash.new(0)
end

Instance Method Details

#bump(*channels, notify: false) ⇒ Object



374
375
376
377
378
379
# File 'lib/pg_versions/pg_versions.rb', line 374

def bump(*channels, notify: false)
	channels = @channels.keys  if channels.size == 0
	versions = @connection_thread.request(:bump, channels)
	update_already_known_versions(versions)  if not notify
	versions
end

#dropObject



420
421
422
423
424
# File 'lib/pg_versions/pg_versions.rb', line 420

def drop
	@notifications << nil
	@connection_thread.request_nonblock(:unsubscribe, self, @channels.keys)
	#TODO: what to do if this object gets used after drop?
end

#each(new_already_known_versions = {}, batch_delay: nil) ⇒ Object



407
408
409
410
411
412
# File 'lib/pg_versions/pg_versions.rb', line 407

def each(new_already_known_versions = {}, batch_delay: nil)
	update_already_known_versions(new_already_known_versions)
	while notification = wait(batch_delay: batch_delay)
		yield notification
	end
end

#notify(versions) ⇒ Object



415
416
417
# File 'lib/pg_versions/pg_versions.rb', line 415

def notify(versions)
	@notifications << versions
end

#read(*channels, notify: false) ⇒ Object



366
367
368
369
370
371
# File 'lib/pg_versions/pg_versions.rb', line 366

def read(*channels, notify: false)
	channels = @channels.keys  if channels.size == 0
	versions = @connection_thread.request(:read, channels)
	update_already_known_versions(versions)  if not notify
	versions
end

#subscribe(channels, known: {}) ⇒ Object



342
343
344
345
346
347
348
349
350
351
# File 'lib/pg_versions/pg_versions.rb', line 342

def subscribe(channels, known: {})
	update_already_known_versions(known)
	channels = [channels].flatten
	channels.select! { |channel|
		(@channels[channel] += 1) == 1
	}
	if channels.size > 0
		@connection_thread.request(:subscribe, self, channels)
	end
end

#unsubscribe(*channels) ⇒ Object



354
355
356
357
358
359
360
361
362
363
# File 'lib/pg_versions/pg_versions.rb', line 354

def unsubscribe(*channels)
	channels = [channels].flatten
	channels.select! { |channel|
		@channels[channel] -= 1
		raise "Trying to unsubscribe from channel (%p) more times than it was subscribed to"%[channel]  if @channels[channel] < 0
		@channels.delete(channel)  if @channels[channel] == 0
		not @channels.has_key?(channel)
	}
	@connection_thread.request(:unsubscribe, self, channels)
end

#update_already_known_versions(new_already_known_versions) ⇒ Object



427
428
429
430
431
# File 'lib/pg_versions/pg_versions.rb', line 427

def update_already_known_versions(new_already_known_versions)
	new_already_known_versions.each { |channel, version|
		@already_known_versions[channel] = version  if (version <=> @already_known_versions[channel]) == 1
	}
end

#wait(new_already_known_versions = {}, batch_delay: nil) ⇒ Object

TODO: make this resume-able after forced exception



383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# File 'lib/pg_versions/pg_versions.rb', line 383

def wait(new_already_known_versions = {}, batch_delay: nil)
	batch_delay = @batch_delay  if batch_delay.nil?
	update_already_known_versions(new_already_known_versions)
	loop {
		events = [@notifications.shift]
		sleep batch_delay  if batch_delay
		events << @notifications.shift  while not @notifications.empty?
		changed_versions = {}
		events.each { |versions|
			return nil  if not versions #termination
			versions.each { |channel, version|
				if (@already_known_versions[channel] <=> version) == -1
					@already_known_versions[channel] = version
					changed_versions[channel] = version
				end
			}
		}
		if changed_versions.size > 0
			return Notification.new(changed_versions, @already_known_versions.dup)
		end
	}
end