Module: BrB::Tunnel::Shared

Included in:
Handler
Defined in:
lib/brb/tunnel/shared.rb

Constant Summary collapse

SizeOfPackedInt =
[1].pack('N').size

Instance Method Summary collapse

Instance Method Details

#brb_send(r) ⇒ Object



25
26
27
28
29
30
31
32
33
# File 'lib/brb/tunnel/shared.rb', line 25

def brb_send(r)
  return nil if !@active
  s = Marshal::dump(r) rescue Marshal::dump(make_proxy(r))

  s = [s.size].pack('N') + s
  EM.schedule do
    send_data s
  end
end

#declare_callback(key, nb_out, &block) ⇒ Object

Declare a new callback to call for a given request Thread safe code



84
85
86
87
88
89
90
91
92
# File 'lib/brb/tunnel/shared.rb', line 84

def declare_callback(key, nb_out, &block)
  @callbacks_mutex.lock

  @callbacks[key] ||= {}
  @callbacks[key][nb_out] = block

ensure
  @callbacks_mutex.unlock
end

#get_callback(key, nb_out) ⇒ Object

Return associated callback if present And if present, delete the associate callback from the table Thread safe code



97
98
99
100
101
102
103
104
105
106
# File 'lib/brb/tunnel/shared.rb', line 97

def get_callback(key, nb_out)
  @callbacks_mutex.lock
  
  if @callbacks[key] and b = @callbacks[key].delete(nb_out)
    return b
  end

ensure
  @callbacks_mutex.unlock
end

#load_requestObject



37
38
39
40
41
42
43
44
45
46
47
# File 'lib/brb/tunnel/shared.rb', line 37

def load_request
  return nil if @buffer.size < SizeOfPackedInt
  len = @buffer.unpack('N').first + SizeOfPackedInt
  if @buffer.size < len
    return nil
  end

  obj =  Marshal::load(@buffer[SizeOfPackedInt, len])
  @buffer.slice!(0,len)
  return obj
end

#make_proxy(r) ⇒ Object



6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# File 'lib/brb/tunnel/shared.rb', line 6

def make_proxy(r)
  if r.is_a?(Array)
    t = []
    r.each do |obj|
      t << if obj.is_a? Array
        make_proxy(obj)
      elsif !obj.is_a?(Symbol) and !obj.is_a?(String) and obj and !(Marshal::dump(obj) rescue nil)
        #BrB.logger.debug "  - > Make proxy for : #{obj.class}"
        obj.to_s.to_sym
      else
        obj
      end
    end
    return t
  else
    return r.to_s
  end
end

#receive_data(data) ⇒ Object



49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# File 'lib/brb/tunnel/shared.rb', line 49

def receive_data(data)
  @buffer << data
  
  while obj = load_request
    if obj[0] == BrB::Request::ReturnCode

      # Return if we have a callback handling the return :
      next if treat_callback_return(obj[1], obj[2], obj[3])

      # No callback, so blocking thread is waiting :
      @replock.lock
      @responses[obj[2]] ||= Queue.new
      @replock.unlock
      @responses[obj[2]] << [obj[1], obj[3]]
    else
      @queue << obj

      EM.defer do
        treat_request(@queue.pop)
      end
      
    end
  end
end

#recv(key, nb_out) ⇒ Object

Blocking method that wait on the @responses table an answer



127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/brb/tunnel/shared.rb', line 127

def recv(key, nb_out)
  begin
    @replock.lock
    r = @responses[key] ||= Queue.new
    @replock.unlock
    while rep = r.pop
      if rep[1] == nb_out # On check ke c'est bien la réponse que l'on attend
        
        # Call the callback
        if block_given?
          yield(rep[0])
        end

        return rep[0]
      end
      if rep[1] > nb_out
        return nil
      end
    end
  rescue Exception => e
    if @close_after_timeout == true
      stop_service
      sleep 1
      raise e
    else
      raise e
    end
  end
end

#treat_callback_return(ret, key, nb_out) ⇒ Object

Call a callback if present, return true if exists Non blocking action, use EM.defer



110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# File 'lib/brb/tunnel/shared.rb', line 110

def treat_callback_return(ret, key, nb_out)

  if b = get_callback(key, nb_out)
    EM.defer do
      # With arity, handle multiple block arguments or no arguments
      b.arity == 1 ? b.call(ret) : (b.arity == 0 ? b.call : b.call(*ret)) 
    end

    # A callback has been found and called, return true
    return true
  end
  
  # No callback, do nothing
  return nil
end

#treat_request(obj) ⇒ Object



74
75
76
77
78
79
80
# File 'lib/brb/tunnel/shared.rb', line 74

def treat_request(obj)
  if obj.size == 2
    new_brb_in_request(obj[1])
  else
    new_brb_in_request(obj[1], *(obj.last))
  end
end