Class: Async::Bus::Protocol::Transaction
- Inherits:
-
Object
- Object
- Async::Bus::Protocol::Transaction
- Defined in:
- lib/async/bus/protocol/transaction.rb
Instance Attribute Summary collapse
-
#id ⇒ Object
readonly
Returns the value of attribute id.
-
#received ⇒ Object
readonly
Returns the value of attribute received.
Instance Method Summary collapse
-
#accept(object, arguments, options, block) ⇒ Object
Accept a remote procedure invokation.
- #close ⇒ Object
-
#initialize(connection, id) ⇒ Transaction
constructor
A new instance of Transaction.
-
#invoke(name, arguments, options, &block) ⇒ Object
Invoke a remote procedure.
- #read ⇒ Object
- #write(*arguments) ⇒ Object
Constructor Details
#initialize(connection, id) ⇒ Transaction
Returns a new instance of Transaction.
29 30 31 32 33 34 35 |
# File 'lib/async/bus/protocol/transaction.rb', line 29 def initialize(connection, id) @connection = connection @id = id @received = Async::Queue.new @accept = nil end |
Instance Attribute Details
#id ⇒ Object (readonly)
Returns the value of attribute id.
37 38 39 |
# File 'lib/async/bus/protocol/transaction.rb', line 37 def id @id end |
#received ⇒ Object (readonly)
Returns the value of attribute received.
38 39 40 |
# File 'lib/async/bus/protocol/transaction.rb', line 38 def received @received end |
Instance Method Details
#accept(object, arguments, options, block) ⇒ Object
Accept a remote procedure invokation.
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/async/bus/protocol/transaction.rb', line 89 def accept(object, arguments, , block) if block result = object.public_send(*arguments, **) do |*yield_arguments| self.write(:yield, yield_arguments) what, result = self.read case what when :next result when :close return when :error raise(result) end end else result = object.public_send(*arguments, **) end self.write(:return, result) rescue UncaughtThrowError => error self.write(:throw, error.tag) rescue => error self.write(:error, error) # ensure # self.write(:close) end |
#close ⇒ Object
53 54 55 56 57 58 59 60 |
# File 'lib/async/bus/protocol/transaction.rb', line 53 def close if @connection connection = @connection @connection = nil connection.transactions.delete(@id) end end |
#invoke(name, arguments, options, &block) ⇒ Object
Invoke a remote procedure.
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/async/bus/protocol/transaction.rb', line 63 def invoke(name, arguments, , &block) self.write(:invoke, name, arguments, , block_given?) while response = self.read what, result = response case what when :error raise(result) when :return return(result) when :yield begin result = yield(*result) self.write(:next, result) rescue => error self.write(:error, error) end end end # ensure # self.write(:close) end |
#read ⇒ Object
40 41 42 43 44 45 46 |
# File 'lib/async/bus/protocol/transaction.rb', line 40 def read if @received.empty? @connection.packer.flush end @received.dequeue end |
#write(*arguments) ⇒ Object
48 49 50 51 |
# File 'lib/async/bus/protocol/transaction.rb', line 48 def write(*arguments) @connection.packer.write([id, *arguments]) @connection.packer.flush end |