Class: Wamp::Worker::Proxy::Dispatcher

Inherits:
Base
  • Object
show all
Defined in:
lib/wamp/worker/proxy/dispatcher.rb

Constant Summary collapse

TIMEOUT =

We want to timeout every few seconds so higher level code can look for a shutdown

2

Instance Attribute Summary collapse

Attributes inherited from Base

#name, #queue, #ticker, #uuid

Instance Method Summary collapse

Methods inherited from Base

#background_res_queue, #command_req_queue, #unique_command_resp_queue

Constructor Details

#initialize(name, session = nil, uuid: nil) ⇒ Dispatcher

Constructor



16
17
18
19
# File 'lib/wamp/worker/proxy/dispatcher.rb', line 16

def initialize(name, session=nil, uuid: nil)
  super name, uuid: uuid
  self.session = session
end

Instance Attribute Details

#sessionObject

Returns the value of attribute session.



8
9
10
# File 'lib/wamp/worker/proxy/dispatcher.rb', line 8

def session
  @session
end

Instance Method Details

#check_queuesObject

Check the queues



29
30
31
# File 'lib/wamp/worker/proxy/dispatcher.rb', line 29

def check_queues
  check_queue [self.command_req_queue, self.background_res_queue]
end

#increment_tickerObject

Increments the ticker



23
24
25
# File 'lib/wamp/worker/proxy/dispatcher.rb', line 23

def increment_ticker
  self.ticker.increment
end

#process(descriptor) ⇒ Object

Executes the request

Parameters:

  • request (Descriptor)
    • The request

Raises:

  • (RuntimeError)


36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
# File 'lib/wamp/worker/proxy/dispatcher.rb', line 36

def process(descriptor)
  return unless descriptor != nil

  raise(RuntimeError, "must have a session to process a descriptor") unless self.session != nil

  # Create the callback
  callback = -> result, error, details {
    # Need to remove the session from the details response
    details&.delete(:session)

    # Create the params
    params = { result: result, error: error, details: details }

    # Push the response back
    self.queue.push descriptor.handle, descriptor.command, params
  }

  # Call the session
  if descriptor.command == :call

    # invoke the call method
    procedure = descriptor.params[:procedure]
    args = descriptor.params[:args]
    kwargs = descriptor.params[:kwargs]
    options = descriptor.params[:options]

    self.session.call(procedure, args, kwargs, options, &callback)

  elsif descriptor.command == :publish

    # invoke the publish method
    topic = descriptor.params[:topic]
    args = descriptor.params[:args]
    kwargs = descriptor.params[:kwargs]
    options = descriptor.params[:options]

    self.session.publish(topic,  args, kwargs, options, &callback)

  elsif descriptor.command == :yield

    # invoke the yield method
    request = descriptor.params[:request]
    options = descriptor.params[:options]
    check_defer = descriptor.params[:check_defer]
    result_hash = descriptor.params[:result] || {}

    # Get the response from the descriptor params
    result = Wamp::Client::Response.from_hash(result_hash)

    self.session.yield(request, result, options, check_defer)

  else

    # Return error if the command is not supported
    error = Wamp::Client::Response::CallError.new(
        Wamp::Client::Response::DEFAULT_ERROR,
        ["unsupported proxy command '#{descriptor.command}'"])
    callback.call(nil, error.to_hash, {})

  end

end