Class: ActiveMessaging::Adapters::Synch::Connection
- Inherits:
-
BaseConnection
- Object
- BaseConnection
- ActiveMessaging::Adapters::Synch::Connection
- Defined in:
- lib/activemessaging/adapters/synch.rb
Instance Attribute Summary collapse
-
#configuration ⇒ Object
configurable params.
-
#max_process ⇒ Object
configurable params.
-
#processing_pids ⇒ Object
configurable params.
-
#use_fork ⇒ Object
configurable params.
Attributes inherited from BaseConnection
Instance Method Summary collapse
-
#initialize(cfg) ⇒ Connection
constructor
generic init method needed by a13g.
- #send(destination_name, message_body, message_headers = {}) ⇒ Object
- #watch_processes ⇒ Object
Methods inherited from BaseConnection
#disconnect, #receive, #received, #subscribe, #unreceive, #unsubscribe
Methods included from ActiveMessaging::Adapter
Constructor Details
#initialize(cfg) ⇒ Connection
generic init method needed by a13g
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
# File 'lib/activemessaging/adapters/synch.rb', line 16 def initialize cfg ActiveMessaging.logger.debug "ActiveMessaging::Adapters::Synch::Connection.initialize: #{cfg.inspect}" @configuration = cfg @use_fork = !!@configuration[:use_fork] # max at once @max_process = 10 # keep track of the processes running @processing_pids = {} if use_fork Thread.new { watch_processes } end end |
Instance Attribute Details
#configuration ⇒ Object
configurable params
13 14 15 |
# File 'lib/activemessaging/adapters/synch.rb', line 13 def configuration @configuration end |
#max_process ⇒ Object
configurable params
13 14 15 |
# File 'lib/activemessaging/adapters/synch.rb', line 13 def max_process @max_process end |
#processing_pids ⇒ Object
configurable params
13 14 15 |
# File 'lib/activemessaging/adapters/synch.rb', line 13 def processing_pids @processing_pids end |
#use_fork ⇒ Object
configurable params
13 14 15 |
# File 'lib/activemessaging/adapters/synch.rb', line 13 def use_fork @use_fork end |
Instance Method Details
#send(destination_name, message_body, message_headers = {}) ⇒ Object
47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/activemessaging/adapters/synch.rb', line 47 def send destination_name, , ={} = Message.new(, 'id', , destination_name, 'MESSAGE') if use_fork if processing_pids.size > max_process ActiveMessaging.logger.debug "ActiveMessaging:synch too many processes: #{processing_pids.size} > #{max_process}" sleep(0.5) end pid = fork { ActiveMessaging.logger.debug "\n-------------------- ActiveMessaging:synch start fork dispath (#{Process.pid}) --------------------" ActiveMessaging::Gateway.prepare_application ActiveMessaging::Gateway._dispatch() ActiveMessaging::Gateway.reset_application ActiveMessaging.logger.debug "-------------------- ActiveMessaging:synch end fork dispath (#{Process.pid})--------------------\n" } Process.detach(pid) processing_pids[pid] = "Destination: #{destination_name}, Message: #{}" else ActiveMessaging.logger.debug "\n-------------------- ActiveMessaging:synch before dispath --------------------" ActiveMessaging::Gateway.prepare_application ActiveMessaging::Gateway._dispatch() ActiveMessaging::Gateway.reset_application ActiveMessaging.logger.debug "-------------------- ActiveMessaging:synch after dispath --------------------\n" end end |
#watch_processes ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/activemessaging/adapters/synch.rb', line 34 def watch_processes while true begin pid = Process.wait(0, Process::WNOHANG) if m = processing_pids.delete(pid) ActiveMessaging.logger.debug "ActiveMessaging:synch - processing complete for pid (#{pid}):\n\t#{m}" end sleep(0.5) rescue end end end |