Class: CitrusRpc::RpcClient::MailStation
- Inherits:
-
Object
- Object
- CitrusRpc::RpcClient::MailStation
- Includes:
- Utils::EventEmitter
- Defined in:
- lib/citrus-rpc/rpc-client/mailstation.rb
Overview
MailStation
Instance Attribute Summary collapse
-
#mailbox_class ⇒ Object
readonly
Returns the value of attribute mailbox_class.
-
#servers ⇒ Object
readonly
Returns the value of attribute servers.
Instance Method Summary collapse
-
#add_server(server_info) ⇒ Object
Add a new server info into the mail station.
-
#add_servers(server_infos) ⇒ Object
Batch version for add new server info.
-
#after(filter) ⇒ Object
Add after filter.
-
#before(filter) ⇒ Object
Add a before filter.
-
#clear_station ⇒ Object
Clear station infomation.
-
#dispatch(server_id, msg, opts, block) ⇒ Object
Dispatch rpc message to the mailbox.
-
#filter(filter) ⇒ Object
Add before and after filter.
-
#initialize(args = {}) ⇒ MailStation
constructor
Create a new mail station.
-
#remove_server(id) ⇒ Object
Remove a server info from the mail station and remove the mailbox instance associated with the server id.
-
#remove_servers(ids) ⇒ Object
Batch version for remove remote servers.
-
#replace_servers(server_infos) ⇒ Object
Replace servers.
-
#start ⇒ Object
Start station and connect all mailboxes to remote servers.
-
#stop(force = false) ⇒ Object
Stop station and all its mailboxes.
Methods included from Utils::EventEmitter
Constructor Details
#initialize(args = {}) ⇒ MailStation
Create a new mail station
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 27 def initialize args={} @args = args @servers = {} # [Hash] server id => info @servers_map = {} # [Hash] server type => servers array @onlines = {} # [Hash] server id => true or false @mailbox_class = @args[:mailbox_class] || WsMailBox # filters @befores = {} @afters = {} # pending request queues @pendings = {} @pending_size = @args[:pending_size] || Constants::DefaultParams::PendingSize # onnecting remote server mailbox map @connecting = {} # working mailbox map @mailboxes = {} @state = :state_inited end |
Instance Attribute Details
#mailbox_class ⇒ Object (readonly)
Returns the value of attribute mailbox_class.
19 20 21 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 19 def mailbox_class @mailbox_class end |
#servers ⇒ Object (readonly)
Returns the value of attribute servers.
19 20 21 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 19 def servers @servers end |
Instance Method Details
#add_server(server_info) ⇒ Object
Add a new server info into the mail station
82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 82 def add_server server_info return unless server_info && server_info[:id] id = server_info[:id] type = server_info[:server_type] @servers[id] = server_info @onlines[id] = true @servers_map[type] ||= [] @servers_map[type] << id emit :add_server, id end |
#add_servers(server_infos) ⇒ Object
Batch version for add new server info
100 101 102 103 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 100 def add_servers server_infos return unless server_infos && server_infos.length > 0 server_infos.each { |server_info| add_server server_info } end |
#after(filter) ⇒ Object
Add after filter
226 227 228 229 230 231 232 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 226 def after filter if filter.instance_of? Array @afters.concat filter return end @afters << filter end |
#before(filter) ⇒ Object
Add a before filter
215 216 217 218 219 220 221 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 215 def before filter if filter.instance_of? Array @befores.concat filter return end @befores << filter end |
#clear_station ⇒ Object
Clear station infomation
134 135 136 137 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 134 def clear_station @onlines = {} @servers_map = {} end |
#dispatch(server_id, msg, opts, block) ⇒ Object
Dispatch rpc message to the mailbox
164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 164 def dispatch server_id, msg, opts, block unless @state == :state_started block.call Exception.new 'client is not running now' return end args = [server_id, msg, opts, block] unless @mailboxes[server_id] # try to connect remote server if mailbox instance not exist yet unless lazy_connect server_id, @mailbox_class emit :error end # push request to the pending queue add_to_pending server_id, args return end # if the mailbox is connecting to remote server if @connecting[server_id] add_to_pending server_id, args return end send = Proc.new { |err, server_id, msg, opts| if err return end unless mailbox = @mailboxes[server_id] return end mailbox.send(msg, opts, proc{ |*args| if send_err = args[0] emit :error return end args.shift do_filter nil, server_id, msg, opts, @befores, 0, 'after', proc{ |err, server_id, msg, opts| if err end block.call *args } }) } do_filter nil, server_id, msg, opts, @afters, 0, 'before', send end |
#filter(filter) ⇒ Object
Add before and after filter
237 238 239 240 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 237 def filter filter @befores << filter @afters << filter end |
#remove_server(id) ⇒ Object
Remove a server info from the mail station and remove the mailbox instance associated with the server id.
109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 109 def remove_server id @onlines[id] = false if @servers[id] type = @servers[id][:server_type] @servers_map[type].delete id end if mailbox = @mailboxes[id] mailbox.close @mailboxes.delete id end emit :remove_server, id end |
#remove_servers(ids) ⇒ Object
Batch version for remove remote servers
128 129 130 131 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 128 def remove_servers ids return unless ids && ids.length > 0 ids.each { |id| remove_server ids } end |
#replace_servers(server_infos) ⇒ Object
Replace servers
142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 142 def replace_servers server_infos clear_station return unless server_infos && server_infos.length > 0 server_infos.each { |server_info| id = server_info[:server_id] type = server_info[:server_type] @onlines[id] = true @servers[id] = server_info @servers_map[type] ||= [] @servers_map[type] << id } end |
#start ⇒ Object
Start station and connect all mailboxes to remote servers
52 53 54 55 56 57 58 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 52 def start unless @state == :state_inited block_given? and yield Exception.new 'station has started' return end EM.next_tick { @state = :state_started; block_given? and yield } end |
#stop(force = false) ⇒ Object
Stop station and all its mailboxes
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'lib/citrus-rpc/rpc-client/mailstation.rb', line 63 def stop force=false unless @state == :state_started return end @state = :state_closed close_all = Proc.new { @mailboxes.each { |server_id, mailbox| mailbox.close } } if force close_all.call else EM.add_timer(Constants::DefaultParams::GraceTimeout) { close_all.call } end end |