Class: Thespian::Strategy::Process

Inherits:
Object
  • Object
show all
Includes:
Interface
Defined in:
lib/thespian/strategies/process.rb

Overview

:nodoc:

Instance Method Summary collapse

Methods included from Interface

#salvage_mailbox, #stop

Constructor Details

#initialize(&block) ⇒ Process

Returns a new instance of Process.



11
12
13
14
15
16
17
18
# File 'lib/thespian/strategies/process.rb', line 11

def initialize(&block)
  @block        = block
  @mailbox      = []
  @mailbox_lock = Monitor.new
  @mailbox_cond = @mailbox_lock.new_cond
  @p_read, @p_write = IO.pipe
  @c_read, @c_write = IO.pipe
end

Instance Method Details

#<<(message) ⇒ Object



37
38
39
40
41
# File 'lib/thespian/strategies/process.rb', line 37

def <<(message)
  @c_write.puts(JSON.dump({ "cmd" => "message", "payload" => Marshal.dump(message) }))
  @c_write.flush
  puts "written"
end

#cmd_mailbox_size(hash) ⇒ Object



66
67
68
69
# File 'lib/thespian/strategies/process.rb', line 66

def cmd_mailbox_size(hash)
  @p_write.puts(@mailbox.size)
  @p_write.flush
end

#cmd_message(hash) ⇒ Object



58
59
60
61
62
63
64
# File 'lib/thespian/strategies/process.rb', line 58

def cmd_message(hash)
  message = Marshal.load(hash["payload"])
  @mailbox_lock.synchronize do
    @mailbox << message
    @mailbox_cond.signal
  end
end

#command_loopObject



49
50
51
52
53
54
55
56
# File 'lib/thespian/strategies/process.rb', line 49

def command_loop
  puts "in command loop"
  while line = @c_read.readline
    puts "!!! #{line}"
    hash = JSON.parse(line)
    send("cmd_%s" % hash["cmd"], hash)
  end
end

#mailbox_sizeObject



43
44
45
46
47
# File 'lib/thespian/strategies/process.rb', line 43

def mailbox_size
  @c_write.puts(JSON.dump({ "cmd" => "mailbox_size" }))
  @c_write.flush
  @p_read.readline.chomp.to_i
end

#receiveObject



30
31
32
33
34
35
# File 'lib/thespian/strategies/process.rb', line 30

def receive
  @mailbox_lock.synchronize do
    @mailbox_cond.wait_while{ @mailbox.empty? }
    @mailbox.shift
  end
end

#startObject



20
21
22
23
24
25
26
27
28
# File 'lib/thespian/strategies/process.rb', line 20

def start
  if fork
    self
  else
    ::Thread.new{ puts "in thread"; command_loop }
    puts "calling block"
    @block.call
  end
end