Class: Archipelago::Disco::Jockey
- Inherits:
-
Object
- Object
- Archipelago::Disco::Jockey
- Defined in:
- lib/archipelago/disco.rb
Overview
The main discovery class used to both publish and lookup services.
Instance Method Summary collapse
-
#clear! ⇒ Object
Clears our local and remote services.
-
#initialize(options = {}) ⇒ Jockey
constructor
Will create a Jockey service running on :address and :port or ADDRESS and PORT if none are given.
-
#lookup(match, options = {}) ⇒ Object
Lookup any services matching
match
, optionally with a:timeout
and optionally:silent
. -
#publish(service) ⇒ Object
Record the given
service
and broadcast about it. -
#setup(options = {}) ⇒ Object
Sets up this instance according to the given
options
. -
#stop! ⇒ Object
Stops all the threads and closes all sockets in this instance.
-
#subscribe(event_type, match, identity, &block) ⇒ Object
Will listen for event_types matching the Query
match
and doblock
.call with the matching Record. -
#unpublish(service_id) ⇒ Object
Removes the service with given
service_id
from the published services. -
#unsubscribe(event_type, match, identity) ⇒ Object
Will stop listening for
event_type
andmatch
withidentity
. -
#validate! ⇒ Object
Validate all our known services.
Constructor Details
#initialize(options = {}) ⇒ Jockey
Will create a Jockey service running on :address and :port or ADDRESS and PORT if none are given.
Will the first available unicast port within :uniports or if not given UNIPORTS for receiving unicast messages.
Will have a default :lookup_timeout of LOOKUP_TIMEOUT, a default :initial_lookup_standoff of INITIAL_LOOKUP_STANDOFF and a default :validation_interval of VALIDATION_INTERVAL.
Will only cache (and validate, which saves network traffic) stuff that has been looked up before if :thrifty_caching, or THRIFTY_CACHING if not given.
Will only reply to the one that sent out the query (and therefore save lots of network traffic) if :thrifty_replying, or THRIFTY_REPLYING if not given.
Will send out a multicast when a new service is published unless :thrifty_publishing, or THRIFTY_PUBLISHING if not given.
Will reply to all queries to which it has matching local services with a unicast message if :thrifty_replying, or if not given THRIFTY_REPLYING. Otherwise will reply with multicasts.
493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 |
# File 'lib/archipelago/disco.rb', line 493 def initialize( = {}) @valid = true @remote_services = ServiceLocker.new(:jockey => self) @local_services = ServiceLocker.new(:jockey => self) @subscribed_services = Set.new @incoming = Queue.new @outgoing = Queue.new @new_service_semaphore = MonitorMixin::ConditionVariable.new(Archipelago::Current::Lock.new) @service_change_subscribers_by_event_type = {:found => {}, :lost => {}} @validation_interval = [:validation_interval] || VALIDATION_INTERVAL setup() start! end |
Instance Method Details
#clear! ⇒ Object
Clears our local and remote services.
586 587 588 589 |
# File 'lib/archipelago/disco.rb', line 586 def clear! @local_services = ServiceLocker.new(:jockey => self) @remote_services = ServiceLocker.new(:jockey => self) end |
#lookup(match, options = {}) ⇒ Object
Lookup any services matching match
, optionally with a :timeout
and optionally :silent
.
Will immediately return if we know of matching and valid services, will otherwise send out regular Queries and return as soon as matching services are found, or when the timeout
runs out.
630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 |
# File 'lib/archipelago/disco.rb', line 630 def lookup(match, = {}) timeout = [:timeout] || @lookup_timeout silent = [:silent] || false match[:unicast_reply] = @unicast_address @subscribed_services << match if @thrifty_caching standoff = @initial_lookup_standoff @outgoing << [nil, match] unless silent known_services = @remote_services.get_services(match).merge(@local_services.get_services(match)) return known_services if timeout == 0 || !known_services.empty? t = Time.new @new_service_semaphore.wait([standoff, timeout].min) standoff *= 2 while Time.new < t + timeout known_services = @remote_services.get_services(match).merge(@local_services.get_services(match)) return known_services unless known_services.empty? wait_time = [standoff, (t + timeout) - Time.new].min @new_service_semaphore.wait(wait_time) standoff *= 2 @outgoing << [nil, match] unless silent end return ServiceLocker.new end |
#publish(service) ⇒ Object
Record the given service
and broadcast about it.
663 664 665 666 667 668 669 670 671 672 |
# File 'lib/archipelago/disco.rb', line 663 def publish(service) if service.valid? service[:published_at] = Time.now @local_services[service[:service_id]] = service @new_service_semaphore.broadcast unless @thrifty_publishing @outgoing << [nil, service] end end end |
#setup(options = {}) ⇒ Object
Sets up this instance according to the given options
.
535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 |
# File 'lib/archipelago/disco.rb', line 535 def setup( = {}) @thrifty_caching = .include?(:thrifty_caching) ? [:thrifty_caching] : THRIFTY_CACHING @thrifty_replying = .include?(:thrifty_replying) ? [:thrifty_replying] : THRIFTY_REPLYING @thrifty_publishing = .include?(:thrifty_publishing) ? [:thrifty_publishing] : THRIFTY_PUBLISHING @lookup_timeout = [:lookup_timeout] || LOOKUP_TIMEOUT @initial_lookup_standoff = [:initial_lookup_standoff] || INITIAL_LOOKUP_STANDOFF @listener = UDPSocket.new @unilistener = UDPSocket.new @listener.setsockopt(Socket::IPPROTO_IP, Socket::IP_ADD_MEMBERSHIP, IPAddr.new([:address] || ADDRESS).hton + Socket.gethostbyname("0.0.0.0")[3]) @listener.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEADDR, true) begin @listener.setsockopt(Socket::SOL_SOCKET, Socket::SO_REUSEPORT, true) rescue # /moo end @listener.bind('', [:port] || PORT) uniports = [:uniports] || UNIPORTS this_port = uniports.min begin @unilistener.bind('', this_port) rescue Errno::EADDRINUSE => e if this_port < uniports.max this_port += 1 retry else raise e end end @unicast_address = "#{HOST}:#{this_port}" @sender = UDPSocket.new @multiaddress = [:address] || ADDRESS @multiport = [:port] || PORT @sender.connect(@multiaddress, @multiport) @unisender = UDPSocket.new end |
#stop! ⇒ Object
Stops all the threads and closes all sockets in this instance.
594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 |
# File 'lib/archipelago/disco.rb', line 594 def stop! if @valid @valid = false @local_services.each do |service_id, service_description| self.unpublish(service_id) end @listener_thread.kill @unilistener_thread.kill @listener.close @unilistener.close while !@incoming.empty? sleep 0.1 end @picker_thread.kill while !@outgoing.empty? sleep 0.1 end @shouter_thread.kill @sender.close @unisender.close @validator_thread.kill end end |
#subscribe(event_type, match, identity, &block) ⇒ Object
Will listen for event_types matching the Query match
and do block
.call with the matching Record.
identity
is used to determine what subscription to remove when doing unsubscribe.
Recognized event_types
: :found, :lost
521 522 523 |
# File 'lib/archipelago/disco.rb', line 521 def subscribe(event_type, match, identity, &block) @service_change_subscribers_by_event_type[event_type][[match, identity]] = block end |
#unpublish(service_id) ⇒ Object
Removes the service with given service_id
from the published services.
677 678 679 680 681 682 683 |
# File 'lib/archipelago/disco.rb', line 677 def unpublish(service_id) @local_services.delete(service_id) @new_service_semaphore.broadcast unless @thrifty_publishing @outgoing << [nil, UnPublish.new(service_id)] end end |
#unsubscribe(event_type, match, identity) ⇒ Object
Will stop listening for event_type
and match
with identity
.
528 529 530 |
# File 'lib/archipelago/disco.rb', line 528 def unsubscribe(event_type, match, identity) @service_change_subscribers_by_event_type[event_type].delete([match, identity]) end |
#validate! ⇒ Object
Validate all our known services.
688 689 690 691 |
# File 'lib/archipelago/disco.rb', line 688 def validate! @local_services.validate! @remote_services.validate! end |