Class: Cheezmiz::Broker
- Inherits:
-
Object
- Object
- Cheezmiz::Broker
- Defined in:
- lib/broker.rb
Constant Summary collapse
- STX =
"\x02"
- ETX =
"\x03"
- MIN_THREADS =
5
- MAX_THREADS =
15
Instance Method Summary collapse
- #callbacks ⇒ Object
- #connect(params = {}) ⇒ Object
-
#initialize(params = {}) ⇒ Broker
constructor
A new instance of Broker.
- #register_callback(operation, options = {:threaded => true}, &proc) ⇒ Object
- #send(message, params = {}) ⇒ Object
- #start ⇒ Object
Constructor Details
#initialize(params = {}) ⇒ Broker
Returns a new instance of Broker.
31 32 33 34 35 36 37 38 |
# File 'lib/broker.rb', line 31 def initialize(params = {}) @sequence_number = 1 @callbacks = Hash.new { |hash, key| hash[key] = [] } @pool = ActionPool::Pool.new( :min_threads => (params[:min_threads] || MIN_THREADS), :max_threads => (params[:max_threads] || MAX_THREADS) ) end |
Instance Method Details
#callbacks ⇒ Object
44 45 46 |
# File 'lib/broker.rb', line 44 def callbacks @callbacks end |
#connect(params = {}) ⇒ Object
55 56 57 58 59 60 61 |
# File 'lib/broker.rb', line 55 def connect(params = {}) @socket = params[:socket] || begin params[:host] ||= 'ctp-a.chikka.com' params[:port] ||= 6301 TCPSocket.open(params[:host], params[:port]) end end |
#register_callback(operation, options = {:threaded => true}, &proc) ⇒ Object
40 41 42 |
# File 'lib/broker.rb', line 40 def register_callback(operation, = {:threaded => true}, &proc) @callbacks[operation] << .merge({ :callback => proc }) end |
#send(message, params = {}) ⇒ Object
48 49 50 51 52 53 |
# File 'lib/broker.rb', line 48 def send(, params = {}) raw = encode({ :message => }.merge(params)) result = @socket.send(raw, 0) process_callbacks(@callbacks[:outgoing], ) return raw, result end |
#start ⇒ Object
63 64 65 66 67 |
# File 'lib/broker.rb', line 63 def start @pool.process do @socket.while_reading { |buffer| (buffer) } end end |