Class: CitrusRpc::RpcClient::MailStation

Inherits:
Object
  • Object
show all
Includes:
Utils::EventEmitter
Defined in:
lib/citrus-rpc/rpc-client/mailstation.rb

Overview

MailStation

Instance Attribute Summary collapse

Instance Method Summary collapse

Methods included from Utils::EventEmitter

#emit, #on, #once

Constructor Details

#initialize(args = {}) ⇒ MailStation

Create a new mail station

Parameters:

  • args (Hash) (defaults to: {})

    Options

Options Hash (args):

  • mailbox_class (Class)
  • pending_size (Integer)


27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 27

def initialize args={}
  @args = args
  @servers = {}       # [Hash] server id => info
  @servers_map = {}   # [Hash] server type => servers array
  @onlines = {}       # [Hash] server id => true or false
  @mailbox_class = @args[:mailbox_class] || WsMailBox

  # filters
  @befores = {}
  @afters = {}

  # pending request queues
  @pendings = {}
  @pending_size = @args[:pending_size] || Constants::DefaultParams::PendingSize

  # onnecting remote server mailbox map
  @connecting = {}

  # working mailbox map
  @mailboxes = {}

  @state = :state_inited
end

Instance Attribute Details

#mailbox_classObject (readonly)

Returns the value of attribute mailbox_class.



19
20
21
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 19

def mailbox_class
  @mailbox_class
end

#serversObject (readonly)

Returns the value of attribute servers.



19
20
21
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 19

def servers
  @servers
end

Instance Method Details

#add_server(server_info) ⇒ Object

Add a new server info into the mail station

Parameters:

  • server_info (Hash)


82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 82

def add_server server_info
  return unless server_info && server_info[:id]

  id = server_info[:id]
  type = server_info[:server_type]

  @servers[id] = server_info
  @onlines[id] = true

  @servers_map[type] ||= []
  @servers_map[type] << id

  emit :add_server, id
end

#add_servers(server_infos) ⇒ Object

Batch version for add new server info

Parameters:

  • server_infos (Array)


100
101
102
103
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 100

def add_servers server_infos
  return unless server_infos && server_infos.length > 0
  server_infos.each { |server_info| add_server server_info }
end

#after(filter) ⇒ Object

Add after filter

Parameters:

  • filter (#call)


226
227
228
229
230
231
232
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 226

def after filter
  if filter.instance_of? Array
    @afters.concat filter
    return
  end
  @afters << filter
end

#before(filter) ⇒ Object

Add a before filter

Parameters:

  • filter (#call)


215
216
217
218
219
220
221
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 215

def before filter
  if filter.instance_of? Array
    @befores.concat filter
    return
  end
  @befores << filter
end

#clear_stationObject

Clear station infomation



134
135
136
137
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 134

def clear_station
  @onlines = {}
  @servers_map = {}
end

#dispatch(server_id, msg, opts, block) ⇒ Object

Dispatch rpc message to the mailbox

Parameters:

  • server_id (String)
  • msg (Hash)
  • opts (Hash)
  • block (#call)


164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 164

def dispatch server_id, msg, opts, block
  unless @state == :state_started
    block.call Exception.new 'client is not running now'
    return
  end

  args = [server_id, msg, opts, block]

  unless @mailboxes[server_id]
    # try to connect remote server if mailbox instance not exist yet
    unless lazy_connect server_id, @mailbox_class
      emit :error
    end
    # push request to the pending queue
    add_to_pending server_id, args
    return
  end

  # if the mailbox is connecting to remote server
  if @connecting[server_id]
    add_to_pending server_id, args
    return
  end

  send = Proc.new { |err, server_id, msg, opts|
    if err
      return
    end
    unless mailbox = @mailboxes[server_id]
      return
    end
    mailbox.send(msg, opts, proc{ |*args|
      if send_err = args[0]
        emit :error
        return
      end
      args.shift
      do_filter nil, server_id, msg, opts, @befores, 0, 'after', proc{ |err, server_id, msg, opts|
        if err
        end
        block.call *args
      }
    })
  }

  do_filter nil, server_id, msg, opts, @afters, 0, 'before', send
end

#filter(filter) ⇒ Object

Add before and after filter

Parameters:

  • filter (#call)


237
238
239
240
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 237

def filter filter
  @befores << filter
  @afters << filter
end

#remove_server(id) ⇒ Object

Remove a server info from the mail station and remove the mailbox instance associated with the server id.

Parameters:

  • id (String)


109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 109

def remove_server id
  @onlines[id] = false

  if @servers[id]
    type = @servers[id][:server_type]
    @servers_map[type].delete id
  end

  if mailbox = @mailboxes[id]
    mailbox.close
    @mailboxes.delete id
  end

  emit :remove_server, id
end

#remove_servers(ids) ⇒ Object

Batch version for remove remote servers

Parameters:

  • ids (Array)


128
129
130
131
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 128

def remove_servers ids
  return unless ids && ids.length > 0
  ids.each { |id| remove_server ids }
end

#replace_servers(server_infos) ⇒ Object

Replace servers

Parameters:

  • server_infos (Array)


142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 142

def replace_servers server_infos
  clear_station
  return unless server_infos && server_infos.length > 0

  server_infos.each { |server_info|
    id = server_info[:server_id]
    type = server_info[:server_type]

    @onlines[id] = true
    @servers[id] = server_info

    @servers_map[type] ||= []
    @servers_map[type] << id
  }
end

#startObject

Start station and connect all mailboxes to remote servers



52
53
54
55
56
57
58
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 52

def start
  unless @state == :state_inited
    block_given? and yield Exception.new 'station has started'
    return
  end
  EM.next_tick { @state = :state_started; block_given? and yield }
end

#stop(force = false) ⇒ Object

Stop station and all its mailboxes

Parameters:

  • force (Boolean) (defaults to: false)


63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 63

def stop force=false
  unless @state == :state_started
    return
  end
  @state = :state_closed

  close_all = Proc.new {
    @mailboxes.each { |server_id, mailbox| mailbox.close }
  }
  if force
    close_all.call
  else
    EM.add_timer(Constants::DefaultParams::GraceTimeout) { close_all.call }
  end
end