Class: Barney::Share

Inherits:
Object
  • Object
show all
Defined in:
lib/barney/share.rb

Defined Under Namespace

Classes: StreamPair

Class Attribute Summary (collapse)

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Barney::Share) initialize {|self| ... }

Yield Parameters:



40
41
42
43
44
45
46
# File 'lib/barney/share.rb', line 40

def initialize
  @shared  = {}
  @history = {}
  @context = nil
  @seq     = 0
  yield self if block_given? 
end

Class Attribute Details

+ (Mutex) mutex (readonly)

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns a Mutex that is used when Barney::Share.value is being accessed by #synchronize

Returns:

  • (Mutex)


22
23
24
# File 'lib/barney/share.rb', line 22

def mutex
  @mutex
end

+ (Object) value

This method is part of a private API. You should avoid using this method if possible, as it may be removed or be changed in the future.

Returns the latest value read from a spawned child process.

Returns:

  • (Object)


17
18
19
# File 'lib/barney/share.rb', line 17

def value
  @value
end

Instance Attribute Details

- (Hash<Fixnum, Hash<Symbol, Object>>) history (readonly)

Returns a history of changes made in multiple forks for a single instance of Barney::Share.

Returns:

  • (Hash<Fixnum, Hash<Symbol, Object>>)

    Fixnum represents sequence, Symbol the variable, and Object its value.



36
37
38
# File 'lib/barney/share.rb', line 36

def history
  @history
end

- (Fixnum) pid (readonly)

Returns the Process ID of the last forked child process.

Returns:

  • (Fixnum)


32
33
34
# File 'lib/barney/share.rb', line 32

def pid
  @pid
end

- (Array<Symbol>) shared (readonly)

Returns a list of all variables or constants being shared for this instance of Barney::Share.

Returns:

  • (Array<Symbol>)


27
# File 'lib/barney/share.rb', line 27

def shared; @shared.keys; end

Instance Method Details

- (Fixnum) fork(&blk)

Serves as a method to spawn a new child process.
It can be treated like the Kernel.fork method, but a block or Proc object is a required argument.

Parameters:

  • Proc (Proc)

    Accepts a block or Proc object that will be executed in a child process.

Returns:

  • (Fixnum)

    Returns the Process ID(PID) of the spawned child process.

Raises:

  • (ArgumentError)

    It will raise an ArgumentError if a block or Proc object isn't supplied as an argument.



79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
# File 'lib/barney/share.rb', line 79

def fork &blk
  raise ArgumentError, "A block or Proc object is expected" unless block_given?
  share *@shared.keys if @pid

  @context = blk.binding
  @pid     = Kernel.fork do
    blk.call
    @shared.each do |variable, history|
      stream = history[-1]
      stream.in.close  
      stream.out.write Marshal.dump(eval("#{variable}", @context))
      stream.out.close
    end
  end
  
  @seq += 1
  @pid
end

- (Array<Symbol>) share(*variables)

Serves as a method to mark a variable or constant to be shared between two processes.

Parameters:

  • Variable (Symbol)

    Accepts a variable amount of Symbol objects.

Returns:

  • (Array<Symbol>)

    Returns a list of all variables that are being shared.



51
52
53
54
55
56
57
58
# File 'lib/barney/share.rb', line 51

def share *variables
  variables.map(&:to_sym).each do |variable|
    if (@shared[variable].nil?) || (not @shared[variable].find { |struct| struct.seq == @seq })
      @shared.store variable, (@shared[variable] || []) << StreamPair.new(@seq, *IO.pipe)
    end
  end
  @shared.keys
end

- synchronize Also known as: sync

This method returns an undefined value.

Serves as a method that synchronizes data between the parent and child process.
It will block until the spawned child process has exited.



101
102
103
104
105
106
107
108
109
110
111
112
113
114
# File 'lib/barney/share.rb', line 101

def synchronize 
  Barney::Share.mutex.synchronize do
    @shared.each do |variable, history|
      history.each do |stream|
        stream.out.close
        Barney::Share.value = Marshal.load stream.in.read
        stream.in.close
        object = eval "#{variable} = Barney::Share.value", @context 
        @history[stream.seq] = (@history[stream.seq] || {}).merge!({ variable => object })
      end
      history.clear
    end
  end
end

- (Array<Symbol>) unshare(*variables)

Serves as a method to remove a variable or constant from being shared between two processes.

Parameters:

  • Variable (Symbol)

    Accepts a variable amount of Symbol objects.

Returns:

  • (Array<Symbol>)

    Returns a list of the variables that are still being shared.



63
64
65
66
67
68
# File 'lib/barney/share.rb', line 63

def unshare *variables
  variables.map(&:to_sym).each do |variable|
    @shared.delete variable
  end
  @shared.keys
end