Class: MobME::Infrastructure::Queue::ZeroMQ::PersistenceServer
- Inherits:
-
Object
- Object
- MobME::Infrastructure::Queue::ZeroMQ::PersistenceServer
- Defined in:
- lib/mobme/infrastructure/queue/zeromq/persistence_server.rb
Instance Method Summary collapse
- #bind ⇒ Object
-
#initialize(options = {}) ⇒ PersistenceServer
constructor
A new instance of PersistenceServer.
- #send_backlog_requests ⇒ Object
Constructor Details
#initialize(options = {}) ⇒ PersistenceServer
Returns a new instance of PersistenceServer.
11 12 13 14 15 16 17 18 19 20 21 22 23 |
# File 'lib/mobme/infrastructure/queue/zeromq/persistence_server.rb', line 11 def initialize( = {}) @queue = MobME::Infrastructure::Queue.queue(:redis) @persistence_socket = [:persistence_socket] || "ipc:///tmp/mobme-infrastructure-queue-persistence.sock" @persistence_store_path = [:persistence_store_path] || "/tmp" @backlog_interval = [:backlog_interval] || 10 EM.synchrony do create_snapshot_directory bind send_backlog_requests end end |
Instance Method Details
#bind ⇒ Object
25 26 27 28 29 |
# File 'lib/mobme/infrastructure/queue/zeromq/persistence_server.rb', line 25 def bind @context = EM::ZeroMQ::Context.new(1) @persistence_request_server = @context.connect(ZMQ::REQ, @persistence_socket) end |
#send_backlog_requests ⇒ Object
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
# File 'lib/mobme/infrastructure/queue/zeromq/persistence_server.rb', line 31 def send_backlog_requests loop do handler = MobME::Infrastructure::Queue::ZeroMQ::ConnectionHandler.new(@persistence_request_server) handler. Marshal.dump("BACKLOG") puts "Sent BACKLOG" snapshot = handler. snapshot = Marshal.load(snapshot) rescue nil case snapshot when nil when false else dump_snapshot_to_disk(snapshot) if snapshot and !snapshot.empty? handler. Marshal.dump("ACK #{ack_signature(snapshot)}") # We get an OK back from the server handler. end end EM::Synchrony.sleep(@backlog_interval) end end |