Class: RbbtProcessQueue::RbbtProcessSocket
- Inherits:
-
Object
- Object
- RbbtProcessQueue::RbbtProcessSocket
- Defined in:
- lib/rbbt/util/concurrency/processes/socket.rb
Instance Attribute Summary collapse
-
#cleaned ⇒ Object
Returns the value of attribute cleaned.
-
#read_sem ⇒ Object
Returns the value of attribute read_sem.
-
#sread ⇒ Object
Returns the value of attribute sread.
-
#swrite ⇒ Object
Returns the value of attribute swrite.
-
#write_sem ⇒ Object
Returns the value of attribute write_sem.
Instance Method Summary collapse
- #clean ⇒ Object
- #close_read ⇒ Object
- #close_write ⇒ Object
- #closed_read? ⇒ Boolean
- #closed_write? ⇒ Boolean
- #dump(obj, stream) ⇒ Object
-
#initialize(serializer = nil) ⇒ RbbtProcessSocket
constructor
A new instance of RbbtProcessSocket.
- #load(stream) ⇒ Object
- #pop ⇒ Object
- #push(obj) ⇒ Object
Constructor Details
#initialize(serializer = nil) ⇒ RbbtProcessSocket
Returns a new instance of RbbtProcessSocket.
7 8 9 10 11 12 13 14 15 16 17 18 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 7 def initialize(serializer = nil) @sread, @swrite = Misc.pipe @serializer = serializer || Marshal @key = "/" << rand(1000000000).to_s << '.' << Process.pid.to_s; @write_sem = @key + '.in' @read_sem = @key + '.out' Log.debug "Creating socket semaphores: #{@key}" RbbtSemaphore.create_semaphore(@write_sem,1) RbbtSemaphore.create_semaphore(@read_sem,1) end |
Instance Attribute Details
#cleaned ⇒ Object
Returns the value of attribute cleaned.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6 def cleaned @cleaned end |
#read_sem ⇒ Object
Returns the value of attribute read_sem.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6 def read_sem @read_sem end |
#sread ⇒ Object
Returns the value of attribute sread.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6 def sread @sread end |
#swrite ⇒ Object
Returns the value of attribute swrite.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6 def swrite @swrite end |
#write_sem ⇒ Object
Returns the value of attribute write_sem.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6 def write_sem @write_sem end |
Instance Method Details
#clean ⇒ Object
20 21 22 23 24 25 26 27 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 20 def clean @cleaned = true @sread.close unless @sread.closed? @swrite.close unless @swrite.closed? Log.low "Destroying socket semaphores: #{[@key] * ", "}" RbbtSemaphore.delete_semaphore(@write_sem) RbbtSemaphore.delete_semaphore(@read_sem) end |
#close_read ⇒ Object
89 90 91 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 89 def close_read @sread.close unless closed_read? end |
#close_write ⇒ Object
85 86 87 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 85 def close_write @swrite.close unless closed_write? end |
#closed_read? ⇒ Boolean
77 78 79 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 77 def closed_read? @sread.closed? end |
#closed_write? ⇒ Boolean
81 82 83 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 81 def closed_write? @swrite.closed? end |
#dump(obj, stream) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 30 def dump(obj, stream) obj.concurrent_stream = nil if obj.respond_to?(:concurrent_stream) case obj when Annotated payload = @serializer.dump(obj) size_head = [payload.bytesize,"S"].pack 'La' str = size_head << payload when String payload = obj size_head = [payload.bytesize,"C"].pack 'La' str = size_head << payload else payload = @serializer.dump(obj) size_head = [payload.bytesize,"S"].pack 'La' str = size_head << payload end write_length = str.length wrote = stream.write(str) while wrote < write_length wrote += stream.write(str[wrote..-1]) end end |
#load(stream) ⇒ Object
54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 54 def load(stream) size_head = Misc.read_stream stream, 5 size, type = size_head.unpack('La') begin payload = Misc.read_stream stream, size case type when "S" begin @serializer.load(payload) rescue Exception Log.exception $! raise $! end when "C" payload end rescue TryAgain retry end end |
#pop ⇒ Object
104 105 106 107 108 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 104 def pop RbbtSemaphore.synchronize(@read_sem) do self.load(@sread) end end |
#push(obj) ⇒ Object
95 96 97 98 99 100 101 102 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 95 def push(obj) RbbtSemaphore.synchronize(@write_sem) do multiple = MultipleResult === obj #obj = Annotated.purge(obj) obj.extend MultipleResult if multiple self.dump(obj, @swrite) end end |