Class: Agent::Push

Inherits:
Object
  • Object
show all
Defined in:
lib/agent/push.rb

Constant Summary collapse

SKIP_MARSHAL_TYPES =
[
  ::Symbol,
  ::Numeric,
  ::NilClass,
  ::TrueClass,
  ::FalseClass,
  ::Queue,
  ::SizedQueue,
  ::Thread,
  ::Mutex,
  ::Monitor,
  ::Module,
  ::IO,
  ::Proc,
  ::Method
]

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(object, options = {}) ⇒ Push

Returns a new instance of Push.



24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# File 'lib/agent/push.rb', line 24

def initialize(object, options={})
  @object = case object
            when *SKIP_MARSHAL_TYPES
              object
            else
              if options[:skip_marshal]
                object
              else
                Marshal.load(Marshal.dump(object))
              end
            end
  @uuid          = options[:uuid] || UUID.generate
  @blocking_once = options[:blocking_once]
  @notifier      = options[:notifier]
  @mutex         = Mutex.new
  @cvar          = ConditionVariable.new
  @sent          = false
  @closed        = false
end

Instance Attribute Details

#blocking_onceObject (readonly)

Returns the value of attribute blocking_once.



22
23
24
# File 'lib/agent/push.rb', line 22

def blocking_once
  @blocking_once
end

#notifierObject (readonly)

Returns the value of attribute notifier.



22
23
24
# File 'lib/agent/push.rb', line 22

def notifier
  @notifier
end

#objectObject (readonly)

Returns the value of attribute object.



22
23
24
# File 'lib/agent/push.rb', line 22

def object
  @object
end

#uuidObject (readonly)

Returns the value of attribute uuid.



22
23
24
# File 'lib/agent/push.rb', line 22

def uuid
  @uuid
end

Instance Method Details

#closeObject



86
87
88
89
90
91
92
93
# File 'lib/agent/push.rb', line 86

def close
  @mutex.synchronize do
    return if @sent
    @closed = true
    @cvar.broadcast
    @notifier.notify(self) if @notifier
  end
end

#closed?Boolean

Returns:

  • (Boolean)


48
49
50
# File 'lib/agent/push.rb', line 48

def closed?
  @closed
end

#receiveObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/agent/push.rb', line 61

def receive
  @mutex.synchronize do
    raise Errors::ChannelClosed if @closed

    if @blocking_once
      _, error = @blocking_once.perform do
        yield @object
        @sent = true
        @cvar.signal
        @notifier.notify(self) if @notifier
      end

      return error
    else
      begin
        yield @object
        @sent = true
        @cvar.signal
        @notifier.notify(self) if @notifier
      rescue Errors::Rollback
      end
    end
  end
end

#sent?Boolean

Returns:

  • (Boolean)


44
45
46
# File 'lib/agent/push.rb', line 44

def sent?
  @sent
end

#waitObject



52
53
54
55
56
57
58
59
# File 'lib/agent/push.rb', line 52

def wait
  @mutex.synchronize do
    until @sent || @closed
      @cvar.wait(@mutex)
    end
    raise Errors::ChannelClosed if @closed
  end
end