Class: RbbtProcessQueue::RbbtProcessSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/rbbt/util/concurrency/processes/socket.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(serializer = nil) ⇒ RbbtProcessSocket

Returns a new instance of RbbtProcessSocket.



7
8
9
10
11
12
13
14
15
16
17
18
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 7

def initialize(serializer = nil)
  @sread, @swrite = Misc.pipe

  @serializer = serializer || Marshal

  @key = "/" << rand(1000000000).to_s << '.' << Process.pid.to_s;
  @write_sem = @key + '.in'
  @read_sem = @key + '.out'
  Log.debug "Creating socket semaphores: #{@key}"
  RbbtSemaphore.create_semaphore(@write_sem,1)
  RbbtSemaphore.create_semaphore(@read_sem,1)
end

Instance Attribute Details

#cleanedObject

Returns the value of attribute cleaned.



6
7
8
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6

def cleaned
  @cleaned
end

#read_semObject

Returns the value of attribute read_sem.



6
7
8
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6

def read_sem
  @read_sem
end

#sreadObject

Returns the value of attribute sread.



6
7
8
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6

def sread
  @sread
end

#swriteObject

Returns the value of attribute swrite.



6
7
8
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6

def swrite
  @swrite
end

#write_semObject

Returns the value of attribute write_sem.



6
7
8
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6

def write_sem
  @write_sem
end

Instance Method Details

#cleanObject



20
21
22
23
24
25
26
27
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 20

def clean
  @cleaned = true
  @sread.close unless @sread.closed?
  @swrite.close unless @swrite.closed?
  Log.low "Destroying socket semaphores: #{[@key] * ", "}"
  RbbtSemaphore.delete_semaphore(@write_sem)
  RbbtSemaphore.delete_semaphore(@read_sem)
end

#close_readObject



88
89
90
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 88

def close_read
  @sread.close unless closed_read?
end

#close_writeObject



84
85
86
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 84

def close_write
  @swrite.close unless closed_write?
end

#closed_read?Boolean

Returns:

  • (Boolean)


76
77
78
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 76

def closed_read?
  @sread.closed?
end

#closed_write?Boolean

Returns:

  • (Boolean)


80
81
82
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 80

def closed_write?
  @swrite.closed?
end

#dump(obj, stream) ⇒ Object



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 30

def dump(obj, stream)
  case obj
  when Annotated
    payload = @serializer.dump(obj)
    size_head = [payload.bytesize,"S"].pack 'La'
    str = size_head << payload
  when String
    payload = obj
    size_head = [payload.bytesize,"C"].pack 'La'
    str = size_head << payload
  else
    payload = @serializer.dump(obj)
    size_head = [payload.bytesize,"S"].pack 'La'
    str = size_head << payload
  end

  write_length = str.length
  wrote = stream.write(str) 
  while wrote < write_length
    wrote += stream.write(str[wrote..-1]) 
  end
end

#load(stream) ⇒ Object



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 53

def load(stream)
  size_head = Misc.read_stream stream, 5

  size, type = size_head.unpack('La')

  begin
    payload = Misc.read_stream stream, size
    case type
    when "S"
      begin
        @serializer.load(payload)
      rescue Exception
        Log.exception $!
        raise $!
      end
    when "C"
      payload
    end
  rescue TryAgain
    retry
  end
end

#popObject



103
104
105
106
107
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 103

def pop
  RbbtSemaphore.synchronize(@read_sem) do
    self.load(@sread)
  end
end

#push(obj) ⇒ Object



94
95
96
97
98
99
100
101
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 94

def push(obj)
  RbbtSemaphore.synchronize(@write_sem) do
    multiple = MultipleResult === obj
    #obj = Annotated.purge(obj)
    obj.extend MultipleResult if multiple
    self.dump(obj, @swrite)
  end
end