Class: Agent::Queue

Inherits:
Object
  • Object
show all
Extended by:
Forwardable
Defined in:
lib/agent/queue.rb,
lib/agent/queue/buffered.rb,
lib/agent/queue/unbuffered.rb

Direct Known Subclasses

Buffered, Unbuffered

Defined Under Namespace

Classes: Buffered, Unbuffered

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(type) ⇒ Queue

Returns a new instance of Queue.

Raises:



19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/agent/queue.rb', line 19

def initialize(type)
  @type = type

  raise Errors::Untyped unless @type
  raise Errors::InvalidType unless @type.is_a?(Module)

  @closed = false

  @queue      = []
  @operations = []
  @pushes     = []
  @pops       = []

  @mutex = Mutex.new

  reset_custom_state
end

Instance Attribute Details

#operationsObject (readonly)

Returns the value of attribute operations.



10
11
12
# File 'lib/agent/queue.rb', line 10

def operations
  @operations
end

#typeObject (readonly)

Returns the value of attribute type.



10
11
12
# File 'lib/agent/queue.rb', line 10

def type
  @type
end

Instance Method Details

#buffered?Boolean

Returns:

  • (Boolean)

Raises:



37
38
39
40
# File 'lib/agent/queue.rb', line 37

def buffered?
  # implement in subclass
  raise Errors::NotImplementedError
end

#closeObject



57
58
59
60
61
62
63
64
65
# File 'lib/agent/queue.rb', line 57

def close
  mutex.synchronize do
    raise Errors::ChannelClosed if @closed
    @operations.each{|o| o.close }
    @closed = true

    reset_custom_state
  end
end

#closed?Boolean

Returns:

  • (Boolean)


67
# File 'lib/agent/queue.rb', line 67

def closed?; @closed; end

#open?Boolean

Returns:

  • (Boolean)


68
# File 'lib/agent/queue.rb', line 68

def open?;   !@closed;   end

#pop(options = {}) ⇒ Object



87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# File 'lib/agent/queue.rb', line 87

def pop(options={})
  pop = Pop.new(options)

  mutex.synchronize do
    pop.close if @closed and queue.size == 0
    operations << pop
    pops << pop
    process
  end

  return pop if options[:deferred]

  ok = pop.wait
  [pop.object, ok]
end

#pop?Boolean

Returns:

  • (Boolean)

Raises:



47
48
49
50
# File 'lib/agent/queue.rb', line 47

def pop?
  # implement in subclass
  raise Errors::NotImplementedError
end

#push(object, options = {}) ⇒ Object



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# File 'lib/agent/queue.rb', line 70

def push(object, options={})
  raise Errors::InvalidType unless object.is_a?(@type)

  push = Push.new(object, options)

  mutex.synchronize do
    raise Errors::ChannelClosed if @closed
    operations << push
    pushes << push
    process
  end

  return push if options[:deferred]

  push.wait
end

#push?Boolean

Returns:

  • (Boolean)

Raises:



52
53
54
55
# File 'lib/agent/queue.rb', line 52

def push?
  # implement in subclass
  raise Errors::NotImplementedError
end

#remove_operations(ops) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/agent/queue.rb', line 103

def remove_operations(ops)
  mutex.synchronize do
    return if @closed

    ops.each do |operation|
      operations.delete(operation)
    end

    pushes.clear
    pops.clear

    operations.each do |operation|
      if operation.is_a?(Push)
        pushes << operation
      else
        pops << operation
      end
    end

    reset_custom_state
  end
end

#unbuffered?Boolean

Returns:

  • (Boolean)

Raises:



42
43
44
45
# File 'lib/agent/queue.rb', line 42

def unbuffered?
  # implement in subclass
  raise Errors::NotImplementedError
end