Module: LockServer::ResourceLockServer
- Defined in:
- lib/lock-server.rb
Overview
Simple resource locking protocol implementation, suitable for EventMachine.
Constant Summary collapse
- @@resource_queues =
Hash of resource queues. Keys are resource names, values are arrays of connections, trying to acquire resource.
{ }
- @@mutexes =
Hash of mutexes used for syncronization of queue access. Keys are resource names, values are Mutex instances.
{ }
Instance Attribute Summary collapse
-
#created_at ⇒ Object
readonly
Returns the value of attribute created_at.
-
#peer_ip ⇒ Object
readonly
Returns the value of attribute peer_ip.
-
#updated_at ⇒ Object
readonly
Returns the value of attribute updated_at.
Instance Method Summary collapse
-
#allow_action ⇒ Object
Sends a notification that requested resource is free now.
- #interrupt_timed_out_connection ⇒ Object
-
#mutex_for(resource) ⇒ Object
Returns a mutex for specified resource.
-
#obtain_lock(resource) ⇒ Object
Creates a lock for requested resource, acquires mutex to protect resource queue manipulations and either notifies remote side about successful resource aquisition or tells it to wait.
- #post_init ⇒ Object
-
#queue_for(resource) ⇒ Object
Returns a queue for specified resource.
-
#receive_data(data) ⇒ Object
A collback of EventMachine, being called when data is received.
-
#release_lock ⇒ Object
Releases lock of previously specified resource.
- #send_line(string) ⇒ Object
- #unbind ⇒ Object
-
#wait_for_lock_release ⇒ Object
Sends a notification that requested resource is locked and an application should wait for lock.
- #write_server_status ⇒ Object
Instance Attribute Details
#created_at ⇒ Object (readonly)
Returns the value of attribute created_at.
37 38 39 |
# File 'lib/lock-server.rb', line 37 def created_at @created_at end |
#peer_ip ⇒ Object (readonly)
Returns the value of attribute peer_ip.
36 37 38 |
# File 'lib/lock-server.rb', line 36 def peer_ip @peer_ip end |
#updated_at ⇒ Object (readonly)
Returns the value of attribute updated_at.
37 38 39 |
# File 'lib/lock-server.rb', line 37 def updated_at @updated_at end |
Instance Method Details
#allow_action ⇒ Object
Sends a notification that requested resource is free now.
136 137 138 |
# File 'lib/lock-server.rb', line 136 def allow_action send_line("ok #{@resource}") end |
#interrupt_timed_out_connection ⇒ Object
113 114 115 |
# File 'lib/lock-server.rb', line 113 def interrupt_timed_out_connection return unless Time.now-@updated_at end |
#mutex_for(resource) ⇒ Object
Returns a mutex for specified resource
Parameters
- resource<String>
-
A name of resource which mutex should be returned
27 28 29 |
# File 'lib/lock-server.rb', line 27 def mutex_for(resource) @@mutexes[resource] ||= Mutex.new end |
#obtain_lock(resource) ⇒ Object
Creates a lock for requested resource, acquires mutex to protect resource queue manipulations and either notifies remote side about successful resource aquisition or tells it to wait. First connection in a queue is owning a resource, otherwise a connection will be put on hold for resource aquisition.
Parameters
- resource<String>
-
A name of resource to lock.
99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/lock-server.rb', line 99 def obtain_lock(resource) @resource = resource queue = queue_for(@resource) return if queue.include?(self) mutex_for(@resource).synchronize do queue << self if queue.size == 1 allow_action else wait_for_lock_release end end end |
#post_init ⇒ Object
39 40 41 42 43 |
# File 'lib/lock-server.rb', line 39 def post_init @resource = nil @peer_ip = Socket.unpack_sockaddr_in(self.get_peername)[1] set_comm_inactivity_timeout(LockServer::CONNECTION_TIMEOUT) end |
#queue_for(resource) ⇒ Object
Returns a queue for specified resource
Parameters
- resource<String>
-
A name of resource which query should be returned
17 18 19 20 |
# File 'lib/lock-server.rb', line 17 def queue_for(resource) p @@resource_queues @@resource_queues[resource] ||= [] end |
#receive_data(data) ⇒ Object
A collback of EventMachine, being called when data is received. Used for processing of lock requests. To aquire lock for needed resource name application should send message ‘lock resource_name’, where resource_name is a name of needed resource. An answer will be either ‘ok resource_name’ (meaning lock have been obtained and application could proceed) or ‘wait resource_name’ (there’s already lock on this resource, and application should wait). To unlock data, ‘unlock’ should be sent. This will remove current connection from a queue and a connection will be closed. Sending any other message will result in ‘err: unknown message’ and connection issued incorrect command will be closed.
Parameters
- data<String>
-
Data fetched from EventMachine
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
# File 'lib/lock-server.rb', line 60 def receive_data(data) @updated_at = Time.now data.each_line do |line| command, resource = line.strip.split(' ') case command when 'lock' obtain_lock(resource) when 'unlock' send_line("released #{@resource}") close_connection_after_writing when "status" write_server_status close_connection_after_writing else send_line("err: unknown message") close_connection_after_writing end end end |
#release_lock ⇒ Object
Releases lock of previously specified resource. Calling this method will remove current connection from @resource aquisitors queue. Also, if current connection is owning a resource (is first in aquisitors queue), it will notify next connection that resource is free now.
Parameters
none
123 124 125 126 127 128 129 130 131 132 133 |
# File 'lib/lock-server.rb', line 123 def release_lock queue = queue_for(@resource) mutex_for(@resource).synchronize do if queue.first == self queue.shift queue.first.allow_action unless queue.empty? else queue.delete self end end end |
#send_line(string) ⇒ Object
45 46 47 |
# File 'lib/lock-server.rb', line 45 def send_line(string) send_data(string+"\n") end |
#unbind ⇒ Object
145 146 147 |
# File 'lib/lock-server.rb', line 145 def unbind release_lock end |
#wait_for_lock_release ⇒ Object
Sends a notification that requested resource is locked and an application should wait for lock.
141 142 143 |
# File 'lib/lock-server.rb', line 141 def wait_for_lock_release send_line("wait #{@resource}") end |
#write_server_status ⇒ Object
84 85 86 87 88 89 90 |
# File 'lib/lock-server.rb', line 84 def write_server_status @@resource_queues.keys.each{ |k| next if queue_for(k).empty? send_line("Resource: #{k} (#{queue_for(k).size} connections waiting)") send_line(queue_for(k).collect(&:peer_ip).join(", ")) } end |