Class: Async::Bus::Protocol::Transaction

Inherits:
Object
  • Object
show all
Defined in:
lib/async/bus/protocol/transaction.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection, id) ⇒ Transaction

Returns a new instance of Transaction.



29
30
31
32
33
34
35
# File 'lib/async/bus/protocol/transaction.rb', line 29

def initialize(connection, id)
	@connection = connection
	@id = id
	
	@received = Async::Queue.new
	@accept = nil
end

Instance Attribute Details

#idObject (readonly)

Returns the value of attribute id.



37
38
39
# File 'lib/async/bus/protocol/transaction.rb', line 37

def id
  @id
end

#receivedObject (readonly)

Returns the value of attribute received.



38
39
40
# File 'lib/async/bus/protocol/transaction.rb', line 38

def received
  @received
end

Instance Method Details

#accept(object, arguments, options, block) ⇒ Object

Accept a remote procedure invokation.



89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/async/bus/protocol/transaction.rb', line 89

def accept(object, arguments, options, block)
	if block
		result = object.public_send(*arguments, **options) do |*yield_arguments|
			self.write(:yield, yield_arguments)
			what, result = self.read
			
			case what
			when :next
				result
			when :close
				return
			when :error
				raise(result)
			end
		end
	else
		result = object.public_send(*arguments, **options)
	end
	
	self.write(:return, result)
rescue UncaughtThrowError => error
	self.write(:throw, error.tag)
rescue => error
	self.write(:error, error)
# ensure
# 	self.write(:close)
end

#closeObject



53
54
55
56
57
58
59
60
# File 'lib/async/bus/protocol/transaction.rb', line 53

def close
	if @connection
		connection = @connection
		@connection = nil
		
		connection.transactions.delete(@id)
	end
end

#invoke(name, arguments, options, &block) ⇒ Object

Invoke a remote procedure.



63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/async/bus/protocol/transaction.rb', line 63

def invoke(name, arguments, options, &block)
	self.write(:invoke, name, arguments, options, block_given?)
	
	while response = self.read
		what, result = response
		
		case what
		when :error
			raise(result)
		when :return
			return(result)
		when :yield
			begin
				result = yield(*result)
				self.write(:next, result)
			rescue => error
				self.write(:error, error)
			end
		end
	end
	
# ensure
# 	self.write(:close)
end

#readObject



40
41
42
43
44
45
46
# File 'lib/async/bus/protocol/transaction.rb', line 40

def read
	if @received.empty?
		@connection.packer.flush
	end
	
	@received.dequeue
end

#write(*arguments) ⇒ Object



48
49
50
51
# File 'lib/async/bus/protocol/transaction.rb', line 48

def write(*arguments)
	@connection.packer.write([id, *arguments])
	@connection.packer.flush
end