Class: ActiveMessaging::Adapters::Synch::Connection

Inherits:
BaseConnection show all
Defined in:
lib/activemessaging/adapters/synch.rb

Instance Attribute Summary collapse

Attributes inherited from BaseConnection

#reliable

Instance Method Summary collapse

Methods inherited from BaseConnection

#disconnect, #receive, #received, #subscribe, #unreceive, #unsubscribe

Methods included from ActiveMessaging::Adapter

included, #logger

Constructor Details

#initialize(cfg) ⇒ Connection

generic init method needed by a13g



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# File 'lib/activemessaging/adapters/synch.rb', line 16

def initialize cfg
  ActiveMessaging.logger.debug "ActiveMessaging::Adapters::Synch::Connection.initialize: #{cfg.inspect}"
  @configuration = cfg
  
  @use_fork = !!@configuration[:use_fork]

  # max at once
  @max_process = 10
  # keep track of the processes running
  @processing_pids = {}

  if use_fork
    Thread.new {
      watch_processes
    }
  end
end

Instance Attribute Details

#configurationObject

configurable params



13
14
15
# File 'lib/activemessaging/adapters/synch.rb', line 13

def configuration
  @configuration
end

#max_processObject

configurable params



13
14
15
# File 'lib/activemessaging/adapters/synch.rb', line 13

def max_process
  @max_process
end

#processing_pidsObject

configurable params



13
14
15
# File 'lib/activemessaging/adapters/synch.rb', line 13

def processing_pids
  @processing_pids
end

#use_forkObject

configurable params



13
14
15
# File 'lib/activemessaging/adapters/synch.rb', line 13

def use_fork
  @use_fork
end

Instance Method Details

#send(destination_name, message_body, message_headers = {}) ⇒ Object



47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/activemessaging/adapters/synch.rb', line 47

def send destination_name, message_body, message_headers={}
  message = Message.new(message_body, 'id', message_headers, destination_name, 'MESSAGE')
  
  if use_fork

    if processing_pids.size > max_process
      ActiveMessaging.logger.debug "ActiveMessaging:synch too many processes: #{processing_pids.size} > #{max_process}"
      sleep(0.5)
    end

    pid = fork {
      ActiveMessaging.logger.debug "\n-------------------- ActiveMessaging:synch start fork dispath (#{Process.pid}) --------------------"
      ActiveMessaging::Gateway.prepare_application
      ActiveMessaging::Gateway._dispatch(message)
      ActiveMessaging::Gateway.reset_application
      ActiveMessaging.logger.debug "-------------------- ActiveMessaging:synch end fork dispath (#{Process.pid})--------------------\n"
    }

    Process.detach(pid)
    processing_pids[pid] = "Destination: #{destination_name}, Message: #{message_body}"

  else

    ActiveMessaging.logger.debug "\n-------------------- ActiveMessaging:synch before dispath --------------------"
    ActiveMessaging::Gateway.prepare_application
    ActiveMessaging::Gateway._dispatch(message)
    ActiveMessaging::Gateway.reset_application
    ActiveMessaging.logger.debug "-------------------- ActiveMessaging:synch after dispath --------------------\n"

  end
  
end

#watch_processesObject



34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/activemessaging/adapters/synch.rb', line 34

def watch_processes
  while true
    begin
      pid = Process.wait(0, Process::WNOHANG)
      if m = processing_pids.delete(pid)
        ActiveMessaging.logger.debug "ActiveMessaging:synch - processing complete for pid (#{pid}):\n\t#{m}"
      end
      sleep(0.5)
    rescue
    end
  end
end