Class: Agent::Channel
- Inherits:
-
Object
- Object
- Agent::Channel
- Defined in:
- lib/agent/channel.rb
Instance Attribute Summary collapse
-
#direction ⇒ Object
readonly
Returns the value of attribute direction.
-
#max ⇒ Object
readonly
Returns the value of attribute max.
-
#name ⇒ Object
readonly
Returns the value of attribute name.
-
#queue ⇒ Object
readonly
Returns the value of attribute queue.
-
#type ⇒ Object
readonly
Returns the value of attribute type.
Instance Method Summary collapse
- #as_receive_only ⇒ Object
- #as_send_only ⇒ Object
-
#close ⇒ Object
Closing methods.
- #closed? ⇒ Boolean
-
#initialize(*args) ⇒ Channel
constructor
A new instance of Channel.
- #marshal_dump ⇒ Object
-
#marshal_load(ary) ⇒ Object
Serialization methods.
- #open? ⇒ Boolean
- #pop? ⇒ Boolean (also: #receive?)
- #push? ⇒ Boolean (also: #send?)
-
#receive(options = {}) ⇒ Object
(also: #pop)
Receiving methods.
- #remove_operations(operations) ⇒ Object
-
#send(object, options = {}) ⇒ Object
(also: #push, #<<)
Sending methods.
Constructor Details
#initialize(*args) ⇒ Channel
Returns a new instance of Channel.
17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/agent/channel.rb', line 17 def initialize(*args) opts = args.last.is_a?(Hash) ? args.pop : {} @type = args.shift @max = args.shift || 0 @closed = false @name = opts[:name] || UUID.generate @direction = opts[:direction] || :bidirectional @skip_marshal = opts[:skip_marshal] @close_mutex = Mutex.new @queue = Queues.register(@name, @type, @max) end |
Instance Attribute Details
#direction ⇒ Object (readonly)
Returns the value of attribute direction.
15 16 17 |
# File 'lib/agent/channel.rb', line 15 def direction @direction end |
#max ⇒ Object (readonly)
Returns the value of attribute max.
15 16 17 |
# File 'lib/agent/channel.rb', line 15 def max @max end |
#name ⇒ Object (readonly)
Returns the value of attribute name.
15 16 17 |
# File 'lib/agent/channel.rb', line 15 def name @name end |
#queue ⇒ Object (readonly)
Returns the value of attribute queue.
15 16 17 |
# File 'lib/agent/channel.rb', line 15 def queue @queue end |
#type ⇒ Object (readonly)
Returns the value of attribute type.
15 16 17 |
# File 'lib/agent/channel.rb', line 15 def type @type end |
Instance Method Details
#as_receive_only ⇒ Object
93 94 95 |
# File 'lib/agent/channel.rb', line 93 def as_receive_only as_direction_only(:receive) end |
#as_send_only ⇒ Object
89 90 91 |
# File 'lib/agent/channel.rb', line 89 def as_send_only as_direction_only(:send) end |
#close ⇒ Object
Closing methods
71 72 73 74 75 76 77 78 |
# File 'lib/agent/channel.rb', line 71 def close @close_mutex.synchronize do raise Errors::ChannelClosed if @closed @closed = true @queue.close Queues.delete(@name) end end |
#closed? ⇒ Boolean
79 |
# File 'lib/agent/channel.rb', line 79 def closed?; @closed; end |
#marshal_dump ⇒ Object
38 39 40 |
# File 'lib/agent/channel.rb', line 38 def marshal_dump [@closed, @name, @max, @type, @direction] end |
#marshal_load(ary) ⇒ Object
Serialization methods
31 32 33 34 35 36 |
# File 'lib/agent/channel.rb', line 31 def marshal_load(ary) @closed, @name, @max, @type, @direction = *ary @queue = Queues[@name] @closed = @queue.nil? || @queue.closed? self end |
#open? ⇒ Boolean
80 |
# File 'lib/agent/channel.rb', line 80 def open?; !@closed; end |
#pop? ⇒ Boolean Also known as: receive?
66 |
# File 'lib/agent/channel.rb', line 66 def pop?; queue.pop?; end |
#push? ⇒ Boolean Also known as: send?
54 |
# File 'lib/agent/channel.rb', line 54 def push?; queue.push?; end |
#receive(options = {}) ⇒ Object Also known as: pop
Receiving methods
59 60 61 62 63 |
# File 'lib/agent/channel.rb', line 59 def receive(={}) check_direction(:receive) q = queue return q.pop() end |
#remove_operations(operations) ⇒ Object
82 83 84 85 86 87 |
# File 'lib/agent/channel.rb', line 82 def remove_operations(operations) # ugly, but it overcomes the race condition without synchronization # since instance variable access is atomic. q = @queue q.remove_operations(operations) if q end |
#send(object, options = {}) ⇒ Object Also known as: push, <<
Sending methods
45 46 47 48 49 50 |
# File 'lib/agent/channel.rb', line 45 def send(object, ={}) check_direction(:send) raise Errors::ChannelClosed if @closed q = @queue q.push(object, {:skip_marshal => @skip_marshal}.merge()) end |