Class: BackgrounDRb::Connection
- Inherits:
-
Object
- Object
- BackgrounDRb::Connection
- Defined in:
- lib/backgroundrb/bdrb_connection.rb
Instance Attribute Summary collapse
-
#cluster_conn ⇒ Object
Returns the value of attribute cluster_conn.
-
#connection_status ⇒ Object
Returns the value of attribute connection_status.
-
#server_ip ⇒ Object
Returns the value of attribute server_ip.
-
#server_port ⇒ Object
Returns the value of attribute server_port.
Instance Method Summary collapse
- #all_worker_info ⇒ Object
- #ask_result(p_data) ⇒ Object
- #ask_work(p_data) ⇒ Object
- #close_connection ⇒ Object
- #delete_worker(p_data) ⇒ Object
- #dump_object(data) ⇒ Object
- #establish_connection ⇒ Object
- #flush_in_loop(data) ⇒ Object
- #gen_key(options) ⇒ Object
-
#initialize(ip, port, cluster_conn) ⇒ Connection
constructor
A new instance of Connection.
- #new_worker(p_data) ⇒ Object
- #read_from_bdrb(timeout = 3) ⇒ Object
- #read_object ⇒ Object
- #send_request(p_data) ⇒ Object
- #server_info ⇒ Object
- #worker_info(p_data) ⇒ Object
- #write_data(data) ⇒ Object
Constructor Details
#initialize(ip, port, cluster_conn) ⇒ Connection
Returns a new instance of Connection.
5 6 7 8 9 10 11 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 5 def initialize ip,port,cluster_conn @mutex = Mutex.new @server_ip = ip @server_port = port @cluster_conn = cluster_conn @connection_status = true end |
Instance Attribute Details
#cluster_conn ⇒ Object
Returns the value of attribute cluster_conn.
3 4 5 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 3 def cluster_conn @cluster_conn end |
#connection_status ⇒ Object
Returns the value of attribute connection_status.
3 4 5 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 3 def connection_status @connection_status end |
#server_ip ⇒ Object
Returns the value of attribute server_ip.
3 4 5 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 3 def server_ip @server_ip end |
#server_port ⇒ Object
Returns the value of attribute server_port.
3 4 5 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 3 def server_port @server_port end |
Instance Method Details
#all_worker_info ⇒ Object
109 110 111 112 113 114 115 116 117 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 109 def all_worker_info p_data = { } p_data[:type] = :all_worker_info dump_object(p_data) bdrb_response = nil @mutex.synchronize { bdrb_response = read_from_bdrb() } close_connection bdrb_response end |
#ask_result(p_data) ⇒ Object
145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 145 def ask_result(p_data) if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache' return_result_from_memcache(p_data) else p_data[:type] = :get_result dump_object(p_data) bdrb_response = nil @mutex.synchronize { bdrb_response = read_from_bdrb() } close_connection bdrb_response ? bdrb_response[:data] : nil end end |
#ask_work(p_data) ⇒ Object
84 85 86 87 88 89 90 91 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 84 def ask_work p_data p_data[:type] = :async_invoke dump_object(p_data) bdrb_response = nil @mutex.synchronize { bdrb_response = read_from_bdrb() } close_connection bdrb_response end |
#close_connection ⇒ Object
79 80 81 82 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 79 def close_connection @connection.close @connection = nil end |
#delete_worker(p_data) ⇒ Object
119 120 121 122 123 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 119 def delete_worker p_data p_data[:type] = :delete_worker dump_object(p_data) close_connection end |
#dump_object(data) ⇒ Object
68 69 70 71 72 73 74 75 76 77 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 68 def dump_object data establish_connection raise BackgrounDRb::BdrbConnError.new("Error while connecting to the backgroundrb server #{server_info}") unless @connection_status object_dump = Marshal.dump(data) dump_length = object_dump.length.to_s length_str = dump_length.rjust(9,'0') final_data = length_str + object_dump @mutex.synchronize { write_data(final_data) } end |
#establish_connection ⇒ Object
14 15 16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 14 def establish_connection begin timeout(3) do @connection = TCPSocket.open(server_ip, server_port) @connection.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1) end @connection_status = true rescue Timeout::Error @connection_status = false rescue Exception => e @connection_status = false end end |
#flush_in_loop(data) ⇒ Object
56 57 58 59 60 61 62 63 64 65 66 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 56 def flush_in_loop(data) t_length = data.length loop do break if t_length <= 0 written_length = @connection.write(data) raise "Error writing to socket" if written_length <= 0 result = @connection.flush data = data[written_length..-1] t_length = data.length end end |
#gen_key(options) ⇒ Object
136 137 138 139 140 141 142 143 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 136 def gen_key if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache' key = [[:worker],[:worker_key],[:job_key]].compact.join('_') key else [:job_key] end end |
#new_worker(p_data) ⇒ Object
93 94 95 96 97 98 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 93 def new_worker p_data p_data[:type] = :start_worker dump_object(p_data) close_connection # RailsWorkerProxy.worker(p_data[:worker],p_data[:worker_key],self) end |
#read_from_bdrb(timeout = 3) ⇒ Object
158 159 160 161 162 163 164 165 166 167 168 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 158 def read_from_bdrb(timeout = 3) begin ret_val = select([@connection],nil,nil,timeout) return nil unless ret_val raw_response = read_object() master_response = Marshal.load(raw_response) return master_response rescue return nil end end |
#read_object ⇒ Object
125 126 127 128 129 130 131 132 133 134 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 125 def read_object begin = @connection.read(9) = .to_i = @connection.read() return rescue raise BackgrounDRb::BdrbConnError.new("Not able to connect #{server_info}") end end |
#send_request(p_data) ⇒ Object
170 171 172 173 174 175 176 177 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 170 def send_request(p_data) p_data[:type] = :sync_invoke dump_object(p_data) bdrb_response = nil @mutex.synchronize { bdrb_response = read_from_bdrb(nil) } close_connection bdrb_response end |
#server_info ⇒ Object
52 53 54 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 52 def server_info "#{server_ip}:#{server_port}" end |
#worker_info(p_data) ⇒ Object
100 101 102 103 104 105 106 107 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 100 def worker_info(p_data) p_data[:type] = :worker_info dump_object(p_data) bdrb_response = nil @mutex.synchronize { bdrb_response = read_from_bdrb() } close_connection bdrb_response end |
#write_data(data) ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# File 'lib/backgroundrb/bdrb_connection.rb', line 28 def write_data data begin flush_in_loop(data) rescue Errno::EAGAIN return rescue Errno::EPIPE establish_connection if @connection_status flush_in_loop(data) else @connection_status = false raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}") end rescue establish_connection if @connection_status flush_in_loop(data) else @connection_status = false raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}") end end end |