Class: BackgrounDRb::Connection

Inherits:
Object
  • Object
show all
Defined in:
lib/backgroundrb/bdrb_connection.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(ip, port, cluster_conn) ⇒ Connection

Returns a new instance of Connection.



5
6
7
8
9
10
11
# File 'lib/backgroundrb/bdrb_connection.rb', line 5

def initialize ip,port,cluster_conn
  @mutex = Mutex.new
  @server_ip = ip
  @server_port = port
  @cluster_conn = cluster_conn
  @connection_status = true
end

Instance Attribute Details

#cluster_connObject

Returns the value of attribute cluster_conn.



3
4
5
# File 'lib/backgroundrb/bdrb_connection.rb', line 3

def cluster_conn
  @cluster_conn
end

#connection_statusObject

Returns the value of attribute connection_status.



3
4
5
# File 'lib/backgroundrb/bdrb_connection.rb', line 3

def connection_status
  @connection_status
end

#server_ipObject

Returns the value of attribute server_ip.



3
4
5
# File 'lib/backgroundrb/bdrb_connection.rb', line 3

def server_ip
  @server_ip
end

#server_portObject

Returns the value of attribute server_port.



3
4
5
# File 'lib/backgroundrb/bdrb_connection.rb', line 3

def server_port
  @server_port
end

Instance Method Details

#all_worker_infoObject



109
110
111
112
113
114
115
116
117
# File 'lib/backgroundrb/bdrb_connection.rb', line 109

def all_worker_info
  p_data = { }
  p_data[:type] = :all_worker_info
  dump_object(p_data)
  bdrb_response = nil
  @mutex.synchronize { bdrb_response = read_from_bdrb() }
  close_connection
  bdrb_response
end

#ask_result(p_data) ⇒ Object



145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/backgroundrb/bdrb_connection.rb', line 145

def ask_result(p_data)
  if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache'
    return_result_from_memcache(p_data)
  else
    p_data[:type] = :get_result
    dump_object(p_data)
    bdrb_response = nil
    @mutex.synchronize { bdrb_response = read_from_bdrb() }
    close_connection
    bdrb_response ? bdrb_response[:data] : nil
  end
end

#ask_work(p_data) ⇒ Object



84
85
86
87
88
89
90
91
# File 'lib/backgroundrb/bdrb_connection.rb', line 84

def ask_work p_data
  p_data[:type] = :async_invoke
  dump_object(p_data)
  bdrb_response = nil
  @mutex.synchronize { bdrb_response = read_from_bdrb() }
  close_connection
  bdrb_response
end

#close_connectionObject



79
80
81
82
# File 'lib/backgroundrb/bdrb_connection.rb', line 79

def close_connection
  @connection.close
  @connection = nil
end

#delete_worker(p_data) ⇒ Object



119
120
121
122
123
# File 'lib/backgroundrb/bdrb_connection.rb', line 119

def delete_worker p_data
  p_data[:type] = :delete_worker
  dump_object(p_data)
  close_connection
end

#dump_object(data) ⇒ Object



68
69
70
71
72
73
74
75
76
77
# File 'lib/backgroundrb/bdrb_connection.rb', line 68

def dump_object data
  establish_connection
  raise BackgrounDRb::BdrbConnError.new("Error while connecting to the backgroundrb server #{server_info}") unless @connection_status

  object_dump = Marshal.dump(data)
  dump_length = object_dump.length.to_s
  length_str = dump_length.rjust(9,'0')
  final_data = length_str + object_dump
  @mutex.synchronize { write_data(final_data) }
end

#establish_connectionObject



14
15
16
17
18
19
20
21
22
23
24
25
26
# File 'lib/backgroundrb/bdrb_connection.rb', line 14

def establish_connection
  begin
    timeout(3) do
      @connection = TCPSocket.open(server_ip, server_port)
      @connection.setsockopt(Socket::IPPROTO_TCP,Socket::TCP_NODELAY,1)
    end
    @connection_status = true
  rescue Timeout::Error
    @connection_status = false
  rescue Exception => e
    @connection_status = false
  end
end

#flush_in_loop(data) ⇒ Object



56
57
58
59
60
61
62
63
64
65
66
# File 'lib/backgroundrb/bdrb_connection.rb', line 56

def flush_in_loop(data)
  t_length = data.length
  loop do
    break if t_length <= 0
    written_length = @connection.write(data)
    raise "Error writing to socket" if written_length <= 0
    result = @connection.flush
    data = data[written_length..-1]
    t_length = data.length
  end
end

#gen_key(options) ⇒ Object



136
137
138
139
140
141
142
143
# File 'lib/backgroundrb/bdrb_connection.rb', line 136

def gen_key options
  if BDRB_CONFIG[:backgroundrb][:result_storage] == 'memcache'
    key = [options[:worker],options[:worker_key],options[:job_key]].compact.join('_')
    key
  else
    options[:job_key]
  end
end

#new_worker(p_data) ⇒ Object



93
94
95
96
97
98
# File 'lib/backgroundrb/bdrb_connection.rb', line 93

def new_worker p_data
  p_data[:type] = :start_worker
  dump_object(p_data)
  close_connection
  # RailsWorkerProxy.worker(p_data[:worker],p_data[:worker_key],self)
end

#read_from_bdrb(timeout = 3) ⇒ Object



158
159
160
161
162
163
164
165
166
167
168
# File 'lib/backgroundrb/bdrb_connection.rb', line 158

def read_from_bdrb(timeout = 3)
  begin
    ret_val = select([@connection],nil,nil,timeout)
    return nil unless ret_val
    raw_response = read_object()
    master_response = Marshal.load(raw_response)
    return master_response
  rescue
    return nil
  end
end

#read_objectObject



125
126
127
128
129
130
131
132
133
134
# File 'lib/backgroundrb/bdrb_connection.rb', line 125

def read_object
  begin
    message_length_str = @connection.read(9)
    message_length = message_length_str.to_i
    message_data = @connection.read(message_length)
    return message_data
  rescue
    raise BackgrounDRb::BdrbConnError.new("Not able to connect #{server_info}")
  end
end

#send_request(p_data) ⇒ Object



170
171
172
173
174
175
176
177
# File 'lib/backgroundrb/bdrb_connection.rb', line 170

def send_request(p_data)
  p_data[:type] = :sync_invoke
  dump_object(p_data)
  bdrb_response = nil
  @mutex.synchronize { bdrb_response = read_from_bdrb(nil) }
  close_connection
  bdrb_response
end

#server_infoObject



52
53
54
# File 'lib/backgroundrb/bdrb_connection.rb', line 52

def server_info
  "#{server_ip}:#{server_port}"
end

#worker_info(p_data) ⇒ Object



100
101
102
103
104
105
106
107
# File 'lib/backgroundrb/bdrb_connection.rb', line 100

def worker_info(p_data)
  p_data[:type] = :worker_info
  dump_object(p_data)
  bdrb_response = nil
  @mutex.synchronize { bdrb_response = read_from_bdrb() }
  close_connection
  bdrb_response
end

#write_data(data) ⇒ Object



28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# File 'lib/backgroundrb/bdrb_connection.rb', line 28

def write_data data
  begin
    flush_in_loop(data)
  rescue Errno::EAGAIN
    return
  rescue Errno::EPIPE
    establish_connection
    if @connection_status
      flush_in_loop(data)
    else
      @connection_status = false
      raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}")
    end
  rescue
    establish_connection
    if @connection_status
      flush_in_loop(data)
    else
      @connection_status = false
      raise BackgrounDRb::BdrbConnError.new("Error while writing #{server_info}")
    end
  end
end