Class: AgentXmpp::MessagePipe

Inherits:
Object
  • Object
show all
Defined in:
lib/agent_xmpp/client/message_pipe.rb

Overview


Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(connection) ⇒ MessagePipe

.….….….….….….….….….….….….….….….….….….….….….….….….….….



15
16
17
18
19
20
21
# File 'lib/agent_xmpp/client/message_pipe.rb', line 15

def initialize(connection)
  @connection = connection
  @connection_status = :offline;
  @delegates = [MessageDelegate]
  @responder_list = {}
  @responder_list_mutex = Mutex.new
end

Instance Attribute Details

#connectionObject (readonly)




8
9
10
# File 'lib/agent_xmpp/client/message_pipe.rb', line 8

def connection
  @connection
end

#connection_statusObject (readonly)




8
9
10
# File 'lib/agent_xmpp/client/message_pipe.rb', line 8

def connection_status
  @connection_status
end

#delegatesObject (readonly)




8
9
10
# File 'lib/agent_xmpp/client/message_pipe.rb', line 8

def delegates
  @delegates
end

#responder_listObject (readonly)




8
9
10
# File 'lib/agent_xmpp/client/message_pipe.rb', line 8

def responder_list
  @responder_list
end

#responder_list_mutexObject (readonly)




8
9
10
# File 'lib/agent_xmpp/client/message_pipe.rb', line 8

def responder_list_mutex
  @responder_list_mutex
end

#stream_featuresObject (readonly)




8
9
10
# File 'lib/agent_xmpp/client/message_pipe.rb', line 8

def stream_features
  @stream_features
end

#stream_mechanismsObject (readonly)




8
9
10
# File 'lib/agent_xmpp/client/message_pipe.rb', line 8

def stream_mechanisms
  @stream_mechanisms
end

Instance Method Details

#add_delegate(delegate) ⇒ Object

.….….….….….….….….….….….….….….….….….….….….….….….….….….



24
25
26
# File 'lib/agent_xmpp/client/message_pipe.rb', line 24

def add_delegate(delegate)
  @delegates << delegate unless @delegates.include?(delegate)
end

#add_to_responder_list(stanza_id, &blk) ⇒ Object

.….….….….….….….….….….….….….….….….….….….….….….….….….….



68
69
70
71
72
# File 'lib/agent_xmpp/client/message_pipe.rb', line 68

def add_to_responder_list(stanza_id, &blk)
  responder_list_mutex.synchronize do
    @responder_list[stanza_id] = {:blk=>blk, :created_at=>Time.now}
  end
end

#broadcast_to_delegates(method, *args) ⇒ Object

.….….….….….….….….….….….….….….….….….….….….….….….….….….



39
40
41
# File 'lib/agent_xmpp/client/message_pipe.rb', line 39

def broadcast_to_delegates(method, *args)
  delegates.inject([]){|r,d| d.respond_to?(method) ? r.push(d.send(method, *args)) : r}.smash
end

#connected?Boolean

.….….….….….….….….….….….….….….….….….….….….….….….….….….

Returns:

  • (Boolean)


63
64
65
# File 'lib/agent_xmpp/client/message_pipe.rb', line 63

def connected?
  connection and !connection.error?
end

#connection_completedObject

.….….….….….….….….….….….….….….….….….….….….….….….….….….



96
97
98
99
100
# File 'lib/agent_xmpp/client/message_pipe.rb', line 96

def connection_completed
  Boot.call_if_implemented(:call_after_connected, self)     
  broadcast_to_delegates(:on_connect, self)
  init_connection.collect{|m| send(m)}
end

#delegates_respond_to?(method) ⇒ Boolean

.….….….….….….….….….….….….….….….….….….….….….….….….….….

Returns:

  • (Boolean)


34
35
36
# File 'lib/agent_xmpp/client/message_pipe.rb', line 34

def delegates_respond_to?(method)
 delegates.inject(0){|r,d| d.respond_to?(method) ? r + 1 : r} > 0
end

#receive(stanza) ⇒ Object


connection callbacks .….….….….….….….….….….….….….….….….….….….….….….….….….….



84
85
86
87
88
89
90
91
92
93
# File 'lib/agent_xmpp/client/message_pipe.rb', line 84

def receive(stanza)
  AgentXmpp.logger.info "RECV: #{stanza.to_s}"
  result = if stanza.kind_of?(Xmpp::Stanza) and stanza.id and callback_info = responder_list[stanza.id]
             responder_list.delete(stanza.id)
             callback_info[:blk].call(stanza)
           else
             process_stanza(stanza)
           end
  send_resp(result)          
end

#remove_delegate(delegate) ⇒ Object

.….….….….….….….….….….….….….….….….….….….….….….….….….….



29
30
31
# File 'lib/agent_xmpp/client/message_pipe.rb', line 29

def remove_delegate(delegate)
  @delegates.delete(delegate)
end

#remove_from_responder_list(stanza_id) ⇒ Object

.….….….….….….….….….….….….….….….….….….….….….….….….….….



75
76
77
78
79
# File 'lib/agent_xmpp/client/message_pipe.rb', line 75

def remove_from_responder_list(stanza_id)
  if @responder_list[stanza_id]
    responder_list_mutex.synchronize{@responder_list.delete(stanza_id)}
  end
end

#send(data, &blk) ⇒ Object

.….….….….….….….….….….….….….….….….….….….….….….….….….….

Raises:



44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/agent_xmpp/client/message_pipe.rb', line 44

def send(data, &blk)
  raise AgentXmppError, 'not connected'  unless connected?
  if block_given? and data.kind_of?(Xmpp::Stanza)
    if data.id.nil?
      data.id = Xmpp::IdGenerator.generate_id
    end
    add_to_responder_list(data.id, &blk)
  end
  AgentXmpp.logger.info "SEND: #{data.to_s}"
  Message.update(data)
  @connection.send_data(data.to_s)
end

#send_resp(resp) ⇒ Object

.….….….….….….….….….….….….….….….….….….….….….….….….….….



58
59
60
# File 'lib/agent_xmpp/client/message_pipe.rb', line 58

def send_resp(resp)
  [resp].flatten.map {|r| r.kind_of?(AgentXmpp::Response) ? send(r.message, &r.responds_with) : r}
end

#send_to_methodObject




11
# File 'lib/agent_xmpp/client/message_pipe.rb', line 11

alias_method :send_to_method, :send

#unbindObject

.….….….….….….….….….….….….….….….….….….….….….….….….….….



103
104
105
106
# File 'lib/agent_xmpp/client/message_pipe.rb', line 103

def unbind
  @connection_status = :off_line
  broadcast_to_delegates(:on_disconnect, self)
end