Module: LockServer::ResourceLockServer

Defined in:
lib/lock-server.rb

Overview

Simple resource locking protocol implementation, suitable for EventMachine.

Constant Summary collapse

@@resource_queues =

Hash of resource queues. Keys are resource names, values are arrays of connections, trying to acquire resource.

{ }
@@mutexes =

Hash of mutexes used for syncronization of queue access. Keys are resource names, values are Mutex instances.

{ }

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#created_atObject (readonly)

Returns the value of attribute created_at.



37
38
39
# File 'lib/lock-server.rb', line 37

def created_at
  @created_at
end

#peer_ipObject (readonly)

Returns the value of attribute peer_ip.



36
37
38
# File 'lib/lock-server.rb', line 36

def peer_ip
  @peer_ip
end

#updated_atObject (readonly)

Returns the value of attribute updated_at.



37
38
39
# File 'lib/lock-server.rb', line 37

def updated_at
  @updated_at
end

Instance Method Details

#allow_actionObject

Sends a notification that requested resource is free now.



136
137
138
# File 'lib/lock-server.rb', line 136

def allow_action
  send_line("ok #{@resource}")
end

#interrupt_timed_out_connectionObject



113
114
115
# File 'lib/lock-server.rb', line 113

def interrupt_timed_out_connection
  return unless Time.now-@updated_at
end

#mutex_for(resource) ⇒ Object

Returns a mutex for specified resource

Parameters

resource<String>

A name of resource which mutex should be returned



27
28
29
# File 'lib/lock-server.rb', line 27

def mutex_for(resource)
  @@mutexes[resource] ||= Mutex.new
end

#obtain_lock(resource) ⇒ Object

Creates a lock for requested resource, acquires mutex to protect resource queue manipulations and either notifies remote side about successful resource aquisition or tells it to wait. First connection in a queue is owning a resource, otherwise a connection will be put on hold for resource aquisition.

Parameters

resource<String>

A name of resource to lock.



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/lock-server.rb', line 99

def obtain_lock(resource)
  @resource = resource
  queue = queue_for(@resource)
  return if queue.include?(self)
  mutex_for(@resource).synchronize do
    queue << self
    if queue.size == 1
      allow_action
    else
      wait_for_lock_release
    end
  end
end

#post_initObject



39
40
41
42
43
# File 'lib/lock-server.rb', line 39

def post_init
  @resource = nil
  @peer_ip = Socket.unpack_sockaddr_in(self.get_peername)[1]
  set_comm_inactivity_timeout(LockServer::CONNECTION_TIMEOUT)
end

#queue_for(resource) ⇒ Object

Returns a queue for specified resource

Parameters

resource<String>

A name of resource which query should be returned



17
18
19
20
# File 'lib/lock-server.rb', line 17

def queue_for(resource)
  p @@resource_queues
  @@resource_queues[resource] ||= []
end

#receive_data(data) ⇒ Object

A collback of EventMachine, being called when data is received. Used for processing of lock requests. To aquire lock for needed resource name application should send message ‘lock resource_name’, where resource_name is a name of needed resource. An answer will be either ‘ok resource_name’ (meaning lock have been obtained and application could proceed) or ‘wait resource_name’ (there’s already lock on this resource, and application should wait). To unlock data, ‘unlock’ should be sent. This will remove current connection from a queue and a connection will be closed. Sending any other message will result in ‘err: unknown message’ and connection issued incorrect command will be closed.

Parameters

data<String>

Data fetched from EventMachine



60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# File 'lib/lock-server.rb', line 60

def receive_data(data)
  @updated_at = Time.now
  data.each_line do |line|
    command, resource = line.strip.split(' ')
    case command

    when 'lock'
      obtain_lock(resource)

    when 'unlock'
      send_line("released #{@resource}")
      close_connection_after_writing

    when "status"
      write_server_status
      close_connection_after_writing

    else
      send_line("err: unknown message")
      close_connection_after_writing
    end
  end
end

#release_lockObject

Releases lock of previously specified resource. Calling this method will remove current connection from @resource aquisitors queue. Also, if current connection is owning a resource (is first in aquisitors queue), it will notify next connection that resource is free now.

Parameters

none



123
124
125
126
127
128
129
130
131
132
133
# File 'lib/lock-server.rb', line 123

def release_lock
  queue = queue_for(@resource)
  mutex_for(@resource).synchronize do
    if queue.first == self
      queue.shift
      queue.first.allow_action unless queue.empty?
    else
      queue.delete self
    end
  end
end

#send_line(string) ⇒ Object



45
46
47
# File 'lib/lock-server.rb', line 45

def send_line(string)
  send_data(string+"\n")
end

#unbindObject



145
146
147
# File 'lib/lock-server.rb', line 145

def unbind
  release_lock
end

#wait_for_lock_releaseObject

Sends a notification that requested resource is locked and an application should wait for lock.



141
142
143
# File 'lib/lock-server.rb', line 141

def wait_for_lock_release
  send_line("wait #{@resource}")
end

#write_server_statusObject



84
85
86
87
88
89
90
# File 'lib/lock-server.rb', line 84

def write_server_status
  @@resource_queues.keys.each{ |k|
    next if queue_for(k).empty?
    send_line("Resource: #{k} (#{queue_for(k).size} connections waiting)")
    send_line(queue_for(k).collect(&:peer_ip).join(", "))
  }
end