Class: RbbtProcessQueue::RbbtProcessSocket
- Inherits:
-
Object
- Object
- RbbtProcessQueue::RbbtProcessSocket
- Defined in:
- lib/rbbt/util/concurrency/processes/socket.rb
Instance Attribute Summary collapse
-
#read_sem ⇒ Object
Returns the value of attribute read_sem.
-
#sread ⇒ Object
Returns the value of attribute sread.
-
#swrite ⇒ Object
Returns the value of attribute swrite.
-
#write_sem ⇒ Object
Returns the value of attribute write_sem.
Instance Method Summary collapse
- #clean ⇒ Object
- #close_read ⇒ Object
- #close_write ⇒ Object
- #closed_read? ⇒ Boolean
- #closed_write? ⇒ Boolean
- #dump(obj, stream) ⇒ Object
-
#initialize(serializer = nil) ⇒ RbbtProcessSocket
constructor
A new instance of RbbtProcessSocket.
- #load(stream) ⇒ Object
- #pop ⇒ Object
- #push(obj) ⇒ Object
Constructor Details
#initialize(serializer = nil) ⇒ RbbtProcessSocket
Returns a new instance of RbbtProcessSocket.
7 8 9 10 11 12 13 14 15 16 17 18 19 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 7 def initialize(serializer = nil) @sread, @swrite = Misc.pipe @serializer = serializer || Marshal @key = "/" << rand(1000000000).to_s << '.' << Process.pid.to_s; @write_sem = @key + '.in' @read_sem = @key + '.out' Log.medium "Creating socket semaphores: #{@key}" RbbtSemaphore.create_semaphore(@write_sem,1) RbbtSemaphore.create_semaphore(@read_sem,1) end |
Instance Attribute Details
#read_sem ⇒ Object
Returns the value of attribute read_sem.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6 def read_sem @read_sem end |
#sread ⇒ Object
Returns the value of attribute sread.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6 def sread @sread end |
#swrite ⇒ Object
Returns the value of attribute swrite.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6 def swrite @swrite end |
#write_sem ⇒ Object
Returns the value of attribute write_sem.
6 7 8 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 6 def write_sem @write_sem end |
Instance Method Details
#clean ⇒ Object
21 22 23 24 25 26 27 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 21 def clean @sread.close unless @sread.closed? @swrite.close unless @swrite.closed? Log.medium "Destroying socket semaphores: #{[@key] * ", "}" RbbtSemaphore.delete_semaphore(@write_sem) RbbtSemaphore.delete_semaphore(@read_sem) end |
#close_read ⇒ Object
79 80 81 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 79 def close_read @sread.close end |
#close_write ⇒ Object
75 76 77 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 75 def close_write @swrite.close end |
#closed_read? ⇒ Boolean
67 68 69 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 67 def closed_read? @sread.closed? end |
#closed_write? ⇒ Boolean
71 72 73 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 71 def closed_write? @swrite.closed? end |
#dump(obj, stream) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 30 def dump(obj, stream) case obj when String payload = obj size_head = [payload.bytesize,"C"].pack 'La' str = size_head << payload else payload = @serializer.dump(obj) size_head = [payload.bytesize,"S"].pack 'La' str = size_head << payload end write_length = str.length wrote = stream.write(str) while wrote < write_length wrote += stream.write(str[wrote..-1]) end end |
#load(stream) ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 49 def load(stream) size_head = Misc.read_stream stream, 5 size, type = size_head.unpack('La') begin payload = Misc.read_stream stream, size case type when "S" @serializer.load(payload) when "C" payload end rescue TryAgain retry end end |
#pop ⇒ Object
91 92 93 94 95 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 91 def pop RbbtSemaphore.synchronize(@read_sem) do self.load(@sread) end end |
#push(obj) ⇒ Object
85 86 87 88 89 |
# File 'lib/rbbt/util/concurrency/processes/socket.rb', line 85 def push(obj) RbbtSemaphore.synchronize(@write_sem) do self.dump(obj, @swrite) end end |