Class: ASS::Server
- Inherits:
-
Object
- Object
- ASS::Server
- Defined in:
- lib/ass/server.rb
Instance Attribute Summary collapse
-
#name ⇒ Object
readonly
Returns the value of attribute name.
Instance Method Summary collapse
- #call(name, method, data, opts = {}, meta = nil) ⇒ Object
- #cast(name, method, data, opts = {}, meta = nil) ⇒ Object
- #exchange ⇒ Object
-
#initialize(name, opts = {}) ⇒ Server
constructor
A new instance of Server.
- #inspect ⇒ Object
- #queue(opts = {}) ⇒ Object
-
#react(_callback = nil, _opts = nil, &_block) ⇒ Object
takes options available to MQ::Queue# takes options available to MQ::Queue#subscribe.
-
#stop(&block) ⇒ Object
unsuscribe from the queue.
Constructor Details
Instance Attribute Details
#name ⇒ Object (readonly)
Returns the value of attribute name.
2 3 4 |
# File 'lib/ass/server.rb', line 2 def name @name end |
Instance Method Details
#call(name, method, data, opts = {}, meta = nil) ⇒ Object
127 128 129 130 131 132 133 134 135 |
# File 'lib/ass/server.rb', line 127 def call(name,method,data,opts={},=nil) reply_to = opts[:reply_to] || self.name ASS.call(name, method, data, opts.merge(:reply_to => reply_to), ) end |
#cast(name, method, data, opts = {}, meta = nil) ⇒ Object
137 138 139 140 141 142 143 144 |
# File 'lib/ass/server.rb', line 137 def cast(name,method,data,opts={},=nil) reply_to = nil # the remote server will not reply ASS.call(name, method, data, opts.merge(:reply_to => nil), ) end |
#exchange ⇒ Object
10 11 12 |
# File 'lib/ass/server.rb', line 10 def exchange @exchange end |
#inspect ⇒ Object
146 147 148 |
# File 'lib/ass/server.rb', line 146 def inspect "#<#{self.class} #{self.name}>" end |
#queue(opts = {}) ⇒ Object
14 15 16 17 18 19 20 |
# File 'lib/ass/server.rb', line 14 def queue(opts={}) unless @queue @queue ||= ASS.mq.queue(self.name,opts) @queue.bind(self.exchange) end self end |
#react(_callback = nil, _opts = nil, &_block) ⇒ Object
takes options available to MQ::Queue# takes options available to MQ::Queue#subscribe
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/ass/server.rb', line 23 def react(_callback=nil,_opts=nil,&_block) if _block _opts = _callback _callback = _block end _opts = {} if _opts.nil? # second call would just swap out the callback. @factory = ASS::CallbackFactory.new(_callback) return(self) if @subscribed @subscribed = true @ack = _opts[:ack] self.queue unless @queue # yikes!! potential for scary bugs @queue.subscribe(_opts) do |info,payload| payload = ASS.serializer.load(payload) #p [info,payload] callback_object = @factory.callback_for(self,info,payload) proc { #|callback_object=prepare_callback(@callback,info,payload)| operation = proc { with_handlers do callback_object.send(:on_call,payload["data"]) end } done = proc { |result| # the client MUST exist, otherwise it's an error. ## FIXME it's bad if the server dies b/c ## the client isn't there. It's bad that ## this can cause the server to fail. ## ## I am not sure what happens if message ## is unroutable. I think it's just ## silently dropped unless the mandatory ## option is given. case status = result[0] when :ok if info.reply_to data = result[1] # respond with cast (we don't want # to get a response to our response, # then respond to the response of # this response, and so on.) ASS.cast(info.reply_to, payload["method"], data, { :routing_key => info.routing_key, :message_id => info.}, payload["meta"]) end info.ack if @ack when :resend # resend the same message ASS.call(self.name, payload["method"], payload["data"], { :reply_to => info.reply_to, # this could be nil for cast :routing_key => info.routing_key, :message_id => info.}, payload["meta"]) info.ack if @ack when :discard # no response back to client info.ack if @ack when :error # programmatic error. don't ack error = result[1] if callback_object.respond_to?(:on_error) begin callback_object.on_error(error,payload["data"]) info.ack if @ack # successful error handling rescue => more_error $stderr.puts more_error $stderr.puts more_error.backtrace ASS.stop end else # unhandled error $stderr.puts error $stderr.puts error.backtrace ASS.stop end # don't ack. end } EM.defer operation, done }.call end self end |
#stop(&block) ⇒ Object
unsuscribe from the queue
118 119 120 121 122 123 124 125 |
# File 'lib/ass/server.rb', line 118 def stop(&block) # allows callback if block @queue.unsubscribe(&block) else @queue.unsubscribe end @subscribed = false end |