Class: Thimble::ThimbleQueue

Inherits:
Object
  • Object
show all
Includes:
Enumerable
Defined in:
lib/thimble_queue.rb

Overview

noinspection RubyTooManyInstanceVariablesInspection

Direct Known Subclasses

Thimble

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(size, name) ⇒ ThimbleQueue

Returns a new instance of ThimbleQueue.



9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# File 'lib/thimble_queue.rb', line 9

def initialize(size, name)
  unless size >= 1
    raise ArgumentError, "make sure there is a size for the queue greater than 1! size received #{size}"
  end

  @id = Digest::SHA256.digest(rand(10**100).to_s + Time.now.to_i.to_s)
  @name = name
  @size = size
  @mutex = Mutex.new
  @queue = []
  @closed = false
  @close_now = false
  @empty = ConditionVariable.new
  @full = ConditionVariable.new
  @logger = Logger.new($stdout)
  @logger.sev_threshold = Logger::UNKNOWN
end

Instance Attribute Details

#sizeObject (readonly)

Returns the value of attribute size.



28
29
30
# File 'lib/thimble_queue.rb', line 28

def size
  @size
end

Instance Method Details

#+(other) ⇒ ThimbleQueue

Will concatenate an enumerable to the ThimbleQueue

Parameters:

  • other (Module<Enumerable>)

Returns:

Raises:

  • (ArgumentError)


49
50
51
52
53
54
55
56
# File 'lib/thimble_queue.rb', line 49

def +(other)
  raise ArgumentError, '+ requires another Enumerable!' unless other.class < Enumerable

  merged_thimble = ThimbleQueue.new(length + other.length, @name)
  each { |item| merged_thimble.push(item) }
  other.each { |item| merged_thimble.push(item) }
  merged_thimble
end

#close(now = false) ⇒ nil

Closes the ThimbleQueue

Parameters:

  • (TrueClass, FalseClass)

Returns:

  • (nil)

Raises:

  • (ArgumentError)


120
121
122
123
124
125
126
127
128
129
130
131
# File 'lib/thimble_queue.rb', line 120

def close(now = false)
  raise ArgumentError, 'now must be true or false' unless [true, false].include?(now)

  @logger.debug("#{@name} is closing")
  @mutex.synchronize do
    @closed = true
    @close_now = true if now
    @full.broadcast
    @empty.broadcast
  end
  @logger.debug("#{@name} is closed: #{@closed} now: #{@close_now}")
end

#closed?TrueClass, FalseClass

checks if the ThimbleQueue is closed

Returns:

  • (TrueClass, FalseClass)


145
146
147
# File 'lib/thimble_queue.rb', line 145

def closed?
  @closed
end

#eachObject



34
35
36
37
38
# File 'lib/thimble_queue.rb', line 34

def each
  while (item = self.next)
    yield item.item
  end
end

#lengthInteger

Returns the size of the ThimbleQueue

Returns:

  • (Integer)


42
43
44
# File 'lib/thimble_queue.rb', line 42

def length
  size
end

#nextObject

Returns the first item in the queue

Returns:

  • (Object)


60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/thimble_queue.rb', line 60

def next
  @mutex.synchronize do
    until @close_now
      a = @queue.shift
      @logger.debug("#{@name}'s queue shifted to: #{a}")
      if !a.nil?
        @full.broadcast
        @empty.broadcast
        return a
      else
        @logger.debug("#{@name}'s queue is currently closed?: #{closed?}")
        return nil if closed?

        @empty.wait(@mutex)
      end
    end
  end
end

#push(input_item) ⇒ Object

This will push whatever it is handed to the queue

Parameters:

  • input_item (Object)


81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/thimble_queue.rb', line 81

def push(input_item)
  raise 'Queue is closed!' if @closed

  @logger.debug("Pushing into #{@name} values: #{input_item}")
  @mutex.synchronize do
    until offer(input_item)
      @full.wait(@mutex)
      @logger.debug("#{@name} is waiting on full")
    end
    @empty.broadcast
  end
  @logger.debug("Finished pushing int #{@name}: #{input_item}")
end

#push_flat(input_item) ⇒ nil

This will flatten any nested arrays out and feed them one at a time to the queue.

Parameters:

  • input_item (Object)

Returns:

  • (nil)


99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/thimble_queue.rb', line 99

def push_flat(input_item)
  raise 'Queue is closed!' if @closed

  @logger.debug("Pushing flat into #{@name} values: #{input_item}")
  if input_item.respond_to? :each
    input_item.each { |item| push(item) }
  else
    @mutex.synchronize do
      until offer(input_item)
        @logger.debug("#{@name} is waiting on full")
        @full.wait(@mutex)
      end
      @empty.broadcast
    end
  end
  @logger.debug("Finished pushing flat into #{@name} values: #{input_item}")
end

#set_logger(level) ⇒ Object



30
31
32
# File 'lib/thimble_queue.rb', line 30

def set_logger(level)
  @logger.sev_threshold = level
end

#to_aArray[Object]

Will force the ThimbleQueue into an array

Returns:

  • (Array[Object])


135
136
137
138
139
140
141
# File 'lib/thimble_queue.rb', line 135

def to_a
  a = []
  while (item = self.next)
    a << item.item
  end
  a
end