Class: Collective::Messager
- Inherits:
-
Object
- Object
- Collective::Messager
- Defined in:
- lib/collective/messager.rb
Overview
Messager is used to send messages between processes, and receive responses.
Messager messages are asynchronous and not ordered.
Defined Under Namespace
Classes: Counter, Message, NoMatch
Instance Attribute Summary collapse
-
#callbacks ⇒ Object
readonly
Returns the value of attribute callbacks.
-
#my_address ⇒ Object
readonly
Returns the value of attribute my_address.
-
#storage ⇒ Object
readonly
Returns the value of attribute storage.
-
#to_address ⇒ Object
readonly
Returns the value of attribute to_address.
Class Method Summary collapse
-
.stringify(map) ⇒ Object
—————————————————————————- Utilities —————————————————————————-.
- .symbolize(map) ⇒ Object
Instance Method Summary collapse
-
#expect(match, &callback) ⇒ Object
register a handler for a given id the handler is removed when it is called.
- #expect_reply(src_id, &reply_block) ⇒ Object
-
#initialize(storage, options = {}) ⇒ Messager
constructor
A new instance of Messager.
-
#receive ⇒ Object
read from my queue check to see if there are any messages, and dispatch them.
-
#reply(body, options) ⇒ Object
sends a new message to the original message source and with reply_to_id from the original message.
-
#send(body, options = {}) ⇒ Object
write to another queue.
Constructor Details
#initialize(storage, options = {}) ⇒ Messager
Returns a new instance of Messager.
22 23 24 25 26 27 28 29 |
# File 'lib/collective/messager.rb', line 22 def initialize( storage, = {} ) @callbacks = {} @storage = storage @to_address = [:to_address] @my_address = [:my_address] or raise "must specify my address" # type checking storage.get("test") end |
Instance Attribute Details
#callbacks ⇒ Object (readonly)
Returns the value of attribute callbacks.
15 16 17 |
# File 'lib/collective/messager.rb', line 15 def callbacks @callbacks end |
#my_address ⇒ Object (readonly)
Returns the value of attribute my_address.
17 18 19 |
# File 'lib/collective/messager.rb', line 17 def my_address @my_address end |
#storage ⇒ Object (readonly)
Returns the value of attribute storage.
16 17 18 |
# File 'lib/collective/messager.rb', line 16 def storage @storage end |
#to_address ⇒ Object (readonly)
Returns the value of attribute to_address.
18 19 20 |
# File 'lib/collective/messager.rb', line 18 def to_address @to_address end |
Class Method Details
.stringify(map) ⇒ Object
Utilities
128 129 130 |
# File 'lib/collective/messager.rb', line 128 def self.stringify(map) Hash[ map.map { |k,v| [ k.to_s, v ] } ] end |
.symbolize(map) ⇒ Object
132 133 134 |
# File 'lib/collective/messager.rb', line 132 def self.symbolize(map) Hash[ map.map { |k,v| [ k.to_sym, v ] } ] end |
Instance Method Details
#expect(match, &callback) ⇒ Object
register a handler for a given id the handler is removed when it is called
47 48 49 50 |
# File 'lib/collective/messager.rb', line 47 def expect( match, &callback ) @callbacks[match] = callback self end |
#expect_reply(src_id, &reply_block) ⇒ Object
61 62 63 |
# File 'lib/collective/messager.rb', line 61 def expect_reply( src_id, &reply_block ) raise end |
#receive ⇒ Object
read from my queue check to see if there are any messages, and dispatch them
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/collective/messager.rb', line 68 def receive() now = Time.now.to_i json = storage.queue_pop( queue_name, now ) if json then = Message.parse(json) callback = find_callback( ) callback.call( ) true else false end end |
#reply(body, options) ⇒ Object
sends a new message to the original message source and with reply_to_id from the original message
55 56 57 58 |
# File 'lib/collective/messager.rb', line 55 def reply( body, ) original = [:to] or raise "must reply to: message" send( body, to: original.from, reply_to_id: original.id ) end |
#send(body, options = {}) ⇒ Object
write to another queue
34 35 36 37 38 39 40 41 42 43 |
# File 'lib/collective/messager.rb', line 34 def send( body, = {} ) to = [:to] || to_address or raise "must specify to address" from = [:from] || my_address or raise "must specify from address" now = [:at] || Time.now = Message.new( .merge( to: to, from: my_address, at: now, body: body ) ) blob = .to_json storage.queue_add( queue_name(to), blob, now.to_i ) .id end |