Class: Archipelago::Disco::Jockey

Inherits:
Object
  • Object
show all
Defined in:
lib/archipelago/disco.rb

Overview

The main discovery class used to both publish and lookup services.

Instance Method Summary collapse

Constructor Details

#initialize(options = {}) ⇒ Jockey

Will create a Jockey service running on :address and :port or ADDRESS and PORT if none are given.

Will the first available unicast port within :uniports or if not given UNIPORTS for receiving unicast messages.

Will have a default :lookup_timeout of LOOKUP_TIMEOUT, a default :initial_lookup_standoff of INITIAL_LOOKUP_STANDOFF and a default :validation_interval of VALIDATION_INTERVAL.

Will only cache (and validate, which saves network traffic) stuff that has been looked up before if :thrifty_caching, or THRIFTY_CACHING if not given.

Will only reply to the one that sent out the query (and therefore save lots of network traffic) if :thrifty_replying, or THRIFTY_REPLYING if not given.

Will send out a multicast when a new service is published unless :thrifty_publishing, or THRIFTY_PUBLISHING if not given.

Will reply to all queries to which it has matching local services with a unicast message if :thrifty_replying, or if not given THRIFTY_REPLYING. Otherwise will reply with multicasts.



356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
# File 'lib/archipelago/disco.rb', line 356

def initialize(options = {})
  @valid = true
  @remote_services = ServiceLocker.new
  @local_services = ServiceLocker.new
  @subscribed_services = Set.new

  @incoming = Queue.new
  @outgoing = Queue.new

  @new_service_semaphore = MonitorMixin::ConditionVariable.new(Archipelago::Current::Lock.new)

  setup(options)

  start_listener
  start_unilistener
  start_shouter
  start_picker
  start_validator(options[:validation_interval] || VALIDATION_INTERVAL)
end

Instance Method Details

#clear!Object

Clears our local and remote services.



428
429
430
431
# File 'lib/archipelago/disco.rb', line 428

def clear!
  @local_services = ServiceLocker.new
  @remote_services = ServiceLocker.new
end

#lookup(match, timeout = @lookup_timeout) ⇒ Object

Lookup any services matching match, optionally with a timeout.

Will immediately return if we know of matching and valid services, will otherwise send out regular Queries and return as soon as matching services are found, or when the timeout runs out.



460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
# File 'lib/archipelago/disco.rb', line 460

def lookup(match, timeout = @lookup_timeout)
  match[:unicast_reply] = @unicast_address
  @subscribed_services << match if @thrifty_caching
  standoff = @initial_lookup_standoff

  @outgoing << [nil, match]
  known_services = @remote_services.get_services(match).merge(@local_services.get_services(match))
  return known_services unless known_services.empty?

  @new_service_semaphore.wait(standoff)
  standoff *= 2
  
  t = Time.new
  while Time.new < t + timeout
    known_services = @remote_services.get_services(match).merge(@local_services.get_services(match))
    return known_services unless known_services.empty?

    @new_service_semaphore.wait(standoff)
    standoff *= 2

    @outgoing << [nil, match]
  end

  ServiceLocker.new
end

#publish(service) ⇒ Object

Record the given service and broadcast about it.



489
490
491
492
493
494
495
496
497
498
# File 'lib/archipelago/disco.rb', line 489

def publish(service)
  if service.valid?
    service[:published_at] = Time.now
    @local_services[service[:service_id]] = service
    @new_service_semaphore.broadcast
    unless @thrifty_publishing
      @outgoing << [nil, service]
    end
  end
end

#setup(options = {}) ⇒ Object

Sets up this instance according to the given options.



379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
# File 'lib/archipelago/disco.rb', line 379

def setup(options = {})
  @thrifty_caching = options.include?(:thrifty_caching) ? options[:thrifty_caching] : THRIFTY_CACHING
  @thrifty_replying = options.include?(:thrifty_replying) ? options[:thrifty_replying] : THRIFTY_REPLYING
  @thrifty_publishing = options.include?(:thrifty_publishing) ? options[:thrifty_publishing] : THRIFTY_PUBLISHING
  @lookup_timeout = options[:lookup_timeout] || LOOKUP_TIMEOUT
  @initial_lookup_standoff = options[:initial_lookup_standoff] || INITIAL_LOOKUP_STANDOFF

  @listener = UDPSocket.new
  @unilistener = UDPSocket.new
  
  @listener.setsockopt(Socket::IPPROTO_IP, 
                       Socket::IP_ADD_MEMBERSHIP, 
                       IPAddr.new(options[:address] || ADDRESS).hton + Socket.gethostbyname("0.0.0.0")[3])
  
  @listener.setsockopt(Socket::SOL_SOCKET, 
                       Socket::SO_REUSEADDR, 
                       true)
  begin
    @listener.setsockopt(Socket::SOL_SOCKET, 
                         Socket::SO_REUSEPORT, 
                         true)
  rescue
    # /moo
  end
  @listener.bind('', options[:port] || PORT)

  uniports = options[:uniports] || UNIPORTS
  this_port = uniports.min
  begin
    @unilistener.bind('', this_port)
  rescue Errno::EADDRINUSE => e
    if this_port < uniports.max
      this_port += 1
      retry
    else
      raise e
    end
  end
  @unicast_address = "#{HOST}:#{this_port}"
  
  @sender = UDPSocket.new
  @sender.connect(options[:address] || ADDRESS, options[:port] || PORT)

  @unisender = UDPSocket.new
end

#stop!Object

Stops all the threads in this instance.



436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
# File 'lib/archipelago/disco.rb', line 436

def stop!
  if @valid
    @valid = false
    @local_services.each do |service_id, service_description|
      self.unpublish(service_id)
    end
    @listener_thread.kill
    @unilistener_thread.kill
    @validator_thread.kill
    @picker_thread.kill
    until @outgoing.empty?
      sleep(0.01)
    end
    @shouter_thread.kill
  end
end

#unpublish(service_id) ⇒ Object

Removes the service with given service_id from the published services.



503
504
505
506
507
508
509
# File 'lib/archipelago/disco.rb', line 503

def unpublish(service_id)
  @local_services.delete(service_id)
  @new_service_semaphore.broadcast
  unless @thrifty_publishing
    @outgoing << [nil, UnPublish.new(service_id)]
  end
end