Class: Agent::Channel

Inherits:
Object
  • Object
show all
Defined in:
lib/agent/channel.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(*args) ⇒ Channel

Returns a new instance of Channel.



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/agent/channel.rb', line 17

def initialize(*args)
  opts          = args.last.is_a?(Hash) ? args.pop : {}
  @type         = args.shift
  @max          = args.shift  || 0
  @closed       = false
  @name         = opts[:name] || UUID.generate
  @direction    = opts[:direction] || :bidirectional
  @skip_marshal = opts[:skip_marshal]
  @close_mutex  = Mutex.new
  @queue        = Queues.register(@name, @type, @max)
end

Instance Attribute Details

#directionObject (readonly)

Returns the value of attribute direction.



15
16
17
# File 'lib/agent/channel.rb', line 15

def direction
  @direction
end

#maxObject (readonly)

Returns the value of attribute max.



15
16
17
# File 'lib/agent/channel.rb', line 15

def max
  @max
end

#nameObject (readonly)

Returns the value of attribute name.



15
16
17
# File 'lib/agent/channel.rb', line 15

def name
  @name
end

#queueObject (readonly)

Returns the value of attribute queue.



15
16
17
# File 'lib/agent/channel.rb', line 15

def queue
  @queue
end

#typeObject (readonly)

Returns the value of attribute type.



15
16
17
# File 'lib/agent/channel.rb', line 15

def type
  @type
end

Instance Method Details

#as_receive_onlyObject



93
94
95
# File 'lib/agent/channel.rb', line 93

def as_receive_only
  as_direction_only(:receive)
end

#as_send_onlyObject



89
90
91
# File 'lib/agent/channel.rb', line 89

def as_send_only
  as_direction_only(:send)
end

#closeObject

Closing methods



71
72
73
74
75
76
77
78
# File 'lib/agent/channel.rb', line 71

def close
  @close_mutex.synchronize do
    raise Errors::ChannelClosed if @closed
    @closed = true
    @queue.close
    Queues.delete(@name)
  end
end

#closed?Boolean

Returns:

  • (Boolean)


79
# File 'lib/agent/channel.rb', line 79

def closed?; @closed; end

#marshal_dumpObject



38
39
40
# File 'lib/agent/channel.rb', line 38

def marshal_dump
  [@closed, @name, @max, @type, @direction]
end

#marshal_load(ary) ⇒ Object

Serialization methods



31
32
33
34
35
36
# File 'lib/agent/channel.rb', line 31

def marshal_load(ary)
  @closed, @name, @max, @type, @direction = *ary
  @queue = Queues[@name]
  @closed = @queue.nil? || @queue.closed?
  self
end

#open?Boolean

Returns:

  • (Boolean)


80
# File 'lib/agent/channel.rb', line 80

def open?;   !@closed;   end

#pop?Boolean Also known as: receive?

Returns:

  • (Boolean)


66
# File 'lib/agent/channel.rb', line 66

def pop?; queue.pop?; end

#push?Boolean Also known as: send?

Returns:

  • (Boolean)


54
# File 'lib/agent/channel.rb', line 54

def push?; queue.push?; end

#receive(options = {}) ⇒ Object Also known as: pop

Receiving methods



59
60
61
62
63
# File 'lib/agent/channel.rb', line 59

def receive(options={})
  check_direction(:receive)
  q = queue
  return q.pop(options)
end

#remove_operations(operations) ⇒ Object



82
83
84
85
86
87
# File 'lib/agent/channel.rb', line 82

def remove_operations(operations)
  # ugly, but it overcomes the race condition without synchronization
  # since instance variable access is atomic.
  q = @queue
  q.remove_operations(operations) if q
end

#send(object, options = {}) ⇒ Object Also known as: push, <<

Sending methods



45
46
47
48
49
50
# File 'lib/agent/channel.rb', line 45

def send(object, options={})
  check_direction(:send)
  raise Errors::ChannelClosed if @closed
  q = @queue
  q.push(object, {:skip_marshal => @skip_marshal}.merge(options))
end