Class: Tem::Mr::Search::Server
- Inherits:
-
Object
- Object
- Tem::Mr::Search::Server
- Defined in:
- lib/tem_mr_search/server.rb
Overview
Map-Reduce RPC server.
The RPC server models the data provider in the secure Map-Reduce proof of concept.
The server accepts database queries expressed as Map-Reduce computations, where each computation is enclosed in a SECpack. This makes it impossible for the data provider to learn about the query. Map-Reduce computations have a single result: the ID of the record that’s the query response, and its score.
The server also accepts direct queries for single database records, so clients can retrieve the record whose ID they learn from the Map-Reduce result.
The db_dump and shutdown requests are for demonstration and testing purposes, and would not be exposed in production servers.
Constant Summary collapse
- DEFAULT_PORT =
9052
- OP =
Zerg::Support::Protocols::ObjectProtocol
- OPAdapter =
Zerg::Support::Sockets::ProtocolAdapter.adapter_module OP
Class Method Summary collapse
-
.tems_from_cluster_file(cluster_file) ⇒ Object
Creates sessions to all the TEMs in a cluster.
Instance Method Summary collapse
-
#initialize(db_file, cluster_file, port) ⇒ Server
constructor
Creates a new Map-Reduce server (master).
-
#process_request(request) ⇒ Object
Computes the response of a single request.
-
#refresh_tems! ⇒ Object
Reinitializes the TEM cluster connections.
-
#serve_loop ⇒ Object
This server’s loop.
Constructor Details
#initialize(db_file, cluster_file, port) ⇒ Server
Creates a new Map-Reduce server (master).
36 37 38 39 40 41 42 43 44 45 46 |
# File 'lib/tem_mr_search/server.rb', line 36 def initialize(db_file, cluster_file, port) @logger = Logger.new STDERR @db = Db.new db_file @cluster_file = cluster_file @tems = [] refresh_tems! @port = port || DEFAULT_PORT @listen_socket = Zerg::Support::SocketFactory.socket :in_port => @port, :reuse_addr => true, :no_delay => true end |
Class Method Details
.tems_from_cluster_file(cluster_file) ⇒ Object
Creates sessions to all the TEMs in a cluster.
106 107 108 109 110 111 112 113 114 |
# File 'lib/tem_mr_search/server.rb', line 106 def self.tems_from_cluster_file(cluster_file) cluster_hosts = File.open(cluster_file, 'r') { |f| YAML.load f } cluster_configs = cluster_hosts.map { |host| Tem::MultiProxy::Client.query_tems host }.flatten cluster_configs.reject { |config| config.nil? }.map do |config| Tem::Session.new Smartcard::Iso::AutoConfigurator.try_transport(config) end end |
Instance Method Details
#process_request(request) ⇒ Object
Computes the response of a single request.
83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# File 'lib/tem_mr_search/server.rb', line 83 def process_request(request) case request[:type] when :search refresh_tems! job = MapReduceJob.new request[:map_reduce] root_tems = request[:root_tems] executor = MapReduceExecutor.new job, @db, @tems, root_tems executor.execute when :fetch @db.item_by_id(request[:id]) || :not_found when :get_tem tem_id = rand @tems.length { :id => tem_id, :ecert => @tems[tem_id].endorsement_cert.to_pem } when :shutdown :shutdown when :db_dump (0...@db.length).map { |i| @db.item(i) } else :unknown end end |
#refresh_tems! ⇒ Object
Reinitializes the TEM cluster connections.
This should be called reasonably often to be able to respond to cluster configuration changes.
52 53 54 55 |
# File 'lib/tem_mr_search/server.rb', line 52 def refresh_tems! @tems.each { |tem| tem.disconnect } @tems = Server.tems_from_cluster_file @cluster_file end |
#serve_loop ⇒ Object
This server’s loop.
58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
# File 'lib/tem_mr_search/server.rb', line 58 def serve_loop @listen_socket.listen @shutdown_received = false until @shutdown_received begin client_socket, client_addr = @listen_socket.accept client_socket.extend OPAdapter request = client_socket.recv_object begin response = process_request request rescue Exception => e @logger.error e response = :error end client_socket.send_object response @shutdown_received = true if response == :shutdown rescue RuntimeError => e @logger.error e end client_socket.close end @listen_socket.close end |