Class: DpStmMap::DistributedPersistentStmMap

Inherits:
Object
  • Object
show all
Defined in:
lib/dp_stm_map/Client.rb

Instance Method Summary collapse

Constructor Details

#initialize(host, port, local_storage) ⇒ DistributedPersistentStmMap

Returns a new instance of DistributedPersistentStmMap.



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# File 'lib/dp_stm_map/Client.rb', line 16

def initialize host, port, local_storage
  @host=host
  @port=port
  @connect_listeners=[]
  @disconnect_listeners=[]
  @connect_state=:disconnected
  @mutex=Mutex.new


  @content={}


  @state=ClientLocalStore.new local_storage


  @validators=[]
  @listeners=[]

  @outgoing_queue=Queue.new


  @outcome_futures={}

  @transaction_id_condition_variable=ConditionVariable.new

end

Instance Method Details

#atomic(timeout = nil) ⇒ Object



186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# File 'lib/dp_stm_map/Client.rb', line 186

def atomic timeout=nil

  outcome_future=Queue.new


  result=nil


  tx_id=SecureRandom.uuid

  view=AtomicView.new @state

  @mutex.synchronize do
    result=yield view
    @outcome_futures[tx_id]=outcome_future
  end

  changes=view.changes

  @validators.each do |validator|
    validator.call changes
  end



  changes=view.changes

  transitions={}

  new_content={}

  changes.each do |k,(old,new)|
    transitions[k] =  [content_digest(old), content_digest(new)]
    new_content[content_digest(new)]=new
  end

  send_to_server ClientTransactionMessage.new tx_id, transitions, new_content

  outcome=outcome_future.pop


  if ClientTransactionSuccessfulMessage === outcome
    @mutex.synchronize do
      while @state.current_transaction_sequence < outcome.transaction_sequence
        @transaction_id_condition_variable.wait(@mutex)
      end
    end
  end


  result

end

#atomic_readObject



249
250
251
252
253
# File 'lib/dp_stm_map/Client.rb', line 249

def atomic_read
  @mutex.synchronize do
    yield AtomicReadView.new @state
  end
end

#content_digest(content) ⇒ Object



241
242
243
244
245
246
247
# File 'lib/dp_stm_map/Client.rb', line 241

def content_digest content
  unless content == nil
    Digest::SHA2.hexdigest(content)
  else
    nil
  end
end

#on_atomic(&block) ⇒ Object



176
177
178
# File 'lib/dp_stm_map/Client.rb', line 176

def on_atomic &block
  @listeners << block
end

#on_connected(&block) ⇒ Object



166
167
168
# File 'lib/dp_stm_map/Client.rb', line 166

def on_connected &block
  @connect_listeners << block
end

#on_disconnected(&block) ⇒ Object



170
171
172
# File 'lib/dp_stm_map/Client.rb', line 170

def on_disconnected &block
  @disconnect_listeners << block
end

#send_to_server(message) ⇒ Object

private



259
260
261
262
# File 'lib/dp_stm_map/Client.rb', line 259

def send_to_server message
  # puts "about to send #{message}"
  @outgoing_queue << message
end

#startObject



43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/dp_stm_map/Client.rb', line 43

def start


  latch=Queue.new

  @reading_thread=Thread.new do
    begin
      # puts "connecting"
      @client_socket=nil
      @client_socket=TCPSocket.new @host,@port

      @client_socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
      # puts "connected"
      @connect_state=:connected


      Thread.new do
        begin
          loop do
            message=@outgoing_queue.pop
            serialized=message.serialize
            @client_socket.write([serialized.bytesize].pack("Q>"))
            @client_socket.write(serialized)
            @client_socket.flush
            # puts "sent #{message}"
          end
        rescue => e
          # puts "Error during processing: #{$!}"
          # puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
        end
      end

      send_to_server ClientHelloMessage.new(@state.current_transaction_sequence)


      latch << "connected"

      @connect_listeners.each do |listener|
        listener.call
      end



      loop do
        read=@client_socket.read(8)
        # break unless read
        len=read.unpack("Q>")[0]

        msg=JsonMessage.deserialize(@client_socket.read(len))
        # puts "got from server %s " % msg
        if ClientTransactionOutcomeMessage === msg
          # pp msg
          @mutex.synchronize do
            if @outcome_futures.has_key? msg.transaction_id
              @outcome_futures.delete(msg.transaction_id).push msg
            end
          end
        end
        if TransactionMessage === msg
          # pp msg
          @mutex.synchronize do

            changes=@state.update msg.transaction_sequence, msg.new_content, msg.transitions, msg.delete_content


            @listeners.each do |listener|
              begin
                listener.call changes
              rescue
              end
            end

            @transaction_id_condition_variable.broadcast

          end

        end
      end
    rescue ShutdownError => e
      # puts "shutdown"
    rescue => e          
      # puts "Error during processing: #{$!}"
      # puts "Backtrace:\n\t#{e.backtrace.join("\n\t")}"
      # puts "error %s" % e
      if @client_socket
        @client_socket.close
      end
      if @connect_state == :connected
        @connect_state=:disconnected
        @disconnect_listeners.each do |listener|
          listener.call
        end
      end
      # puts "Exception: %s" % e
      sleep 0.1
      retry
    ensure
      if @connect_state == :connected
        @connect_state=:disconnected
        @disconnect_listeners.each do |listener|
          listener.call
        end
      end
      if @client_socket && !@client_socket.closed?
        @client_socket.close
      end
    end

  end

  latch.pop
end

#stopObject



156
157
158
159
160
161
162
163
# File 'lib/dp_stm_map/Client.rb', line 156

def stop
  @reading_thread.raise ShutdownError
  begin
    @reading_thread.join
  rescue => e
  end
  # puts "stopped"
end

#validate_atomic(&block) ⇒ Object



181
182
183
# File 'lib/dp_stm_map/Client.rb', line 181

def validate_atomic &block
  @validators << block
end