Class: TimeScheduler::EventQueue

Inherits:
Object
  • Object
show all
Defined in:
lib/time_scheduler/event.rb

Constant Summary collapse

REFRESH_TIME =
3600.0

Instance Method Summary collapse

Constructor Details

#initialize(**option) ⇒ EventQueue

Returns a new instance of EventQueue.



27
28
29
30
31
32
# File 'lib/time_scheduler/event.rb', line 27

def initialize( **option )
  @event_mutex  =  ::Mutex.new
  @event_queue  =  ::Queue.new
  @event_set  =  OrderSet.new
  run
end

Instance Method Details

#refresh(time) ⇒ Object



49
50
51
52
53
54
55
# File 'lib/time_scheduler/event.rb', line 49

def refresh( time )
  wait_time  =  [time - Time.now, REFRESH_TIME + rand * 2 - 1].min
  Thread.start do
    ::Kernel.sleep( wait_time )
    @event_queue.push( 1 )
  end
end

#reserve(time, queue, ident) ⇒ Object



34
35
36
37
38
39
40
41
42
43
44
45
46
47
# File 'lib/time_scheduler/event.rb', line 34

def reserve( time, queue, ident )
  @event_mutex.synchronize do
    @event_set.add( EventItem.new( time, queue, ident ) )
    wait_time  =  [time - Time.now, REFRESH_TIME + rand * 2 - 1].min
    if  wait_time <= 0
      @event_queue.push( 1 )
    else
      Thread.start do
        ::Kernel.sleep( wait_time )
        @event_queue.push( 1 )
      end
    end
  end
end

#runObject



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/time_scheduler/event.rb', line 57

def run
  Thread.start do
    begin
      while  true
        @event_queue.pop
        @event_mutex.synchronize do
          event_item  =  @event_set.first
          now  =  Time.now
          if  event_item.time <= now
            @event_set.delete( event_item )
            event_item.queue.push( [now, event_item.ident] )
          else
            refresh( event_item.time )
          end
        end
      end
    rescue => e
      raise  ::TimeScheduler::Error, "#{__FILE__}: #{e.message}"
    end
  end
end