Class: SelfSDK::MessagingClient

Inherits:
Object
  • Object
show all
Defined in:
lib/messaging.rb

Constant Summary collapse

DEFAULT_DEVICE =
"1"
DEFAULT_AUTO_RECONNECT =
true
DEFAULT_STORAGE_DIR =
"./.self_storage"
ON_DEMAND_CLOSE_CODE =
3999

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, client, storage_key, storage_folder, options = {}) ⇒ MessagingClient

RestClient initializer

Parameters:

  • url (string)

    self-messaging url

  • opts (Hash)

    a customizable set of options

[View source]

33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
# File 'lib/messaging.rb', line 33

def initialize(url, client, storage_key, storage_folder, options = {})
  @mon = Monitor.new
  @url = url
  @messages = {}
  @acks = {}
  @type_observer = {}
  @uuid_observer = {}
  @jwt = client.jwt
  @client = client
  @ack_timeout = 60 # seconds
  @timeout = 120 # seconds
  @device_id = options.fetch(:device_id, DEFAULT_DEVICE)
  @auto_reconnect = options.fetch(:auto_reconnect, DEFAULT_AUTO_RECONNECT)
  @storage_dir = options.fetch(:storage_dir, DEFAULT_STORAGE_DIR)
  @offset_file = "#{@storage_dir}/#{@jwt.id}:#{@device_id}.offset"
  @offset = read_offset

  FileUtils.mkdir_p @storage_dir unless File.exist? @storage_dir
  unless options.include? :no_crypto
    @encryption_client = Crypto.new(@client, @device_id, storage_folder, storage_key)
  end

  if options.include? :ws
    @ws = options[:ws]
  else
    start
  end
end

Instance Attribute Details

#ack_timeoutObject

Returns the value of attribute ack_timeout.


22
23
24
# File 'lib/messaging.rb', line 22

def ack_timeout
  @ack_timeout
end

#clientObject

Returns the value of attribute client.


22
23
24
# File 'lib/messaging.rb', line 22

def client
  @client
end

#device_idObject

Returns the value of attribute device_id.


22
23
24
# File 'lib/messaging.rb', line 22

def device_id
  @device_id
end

#encryption_clientObject

Returns the value of attribute encryption_client.


22
23
24
# File 'lib/messaging.rb', line 22

def encryption_client
  @encryption_client
end

#jwtObject

Returns the value of attribute jwt.


22
23
24
# File 'lib/messaging.rb', line 22

def jwt
  @jwt
end

#timeoutObject

Returns the value of attribute timeout.


22
23
24
# File 'lib/messaging.rb', line 22

def timeout
  @timeout
end

#type_observerObject

Returns the value of attribute type_observer.


22
23
24
# File 'lib/messaging.rb', line 22

def type_observer
  @type_observer
end

#uuid_observerObject

Returns the value of attribute uuid_observer.


22
23
24
# File 'lib/messaging.rb', line 22

def uuid_observer
  @uuid_observer
end

Instance Method Details

#add_acl_rule(payload) ⇒ Object

Allows incomming messages from the given identity

[View source]

110
111
112
113
114
115
116
117
# File 'lib/messaging.rb', line 110

def add_acl_rule(payload)
  send_message Msgproto::AccessControlList.new(
    type: Msgproto::MsgType::ACL,
    id: SecureRandom.uuid,
    command: Msgproto::ACLCommand::PERMIT,
    payload: payload,
  )
end

#clean_observersObject

[View source]

207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# File 'lib/messaging.rb', line 207

def clean_observers
  live = {}
  @uuid_observer.clone.each do |id, msg|
    if msg[:timeout] < SelfSDK::Time.now
      message = SelfSDK::Messages::Base.new(self)
      message.status = "errored"

      @uuid_observer[id][:block].call(message)
      @uuid_observer.delete(id)
    else
      live[id] = msg
    end
  end
  @uuid_observer = live
end

#closeObject

[View source]

72
73
74
# File 'lib/messaging.rb', line 72

def close
  @ws.close(ON_DEMAND_CLOSE_CODE, "connection closed by the client")
end

#list_acl_rulesObject

Lists acl rules

[View source]

132
133
134
135
136
137
138
139
140
# File 'lib/messaging.rb', line 132

def list_acl_rules
  wait_for 'acl_list' do
    send_raw Msgproto::AccessControlList.new(
      type: Msgproto::MsgType::ACL,
      id: SecureRandom.uuid,
      command: Msgproto::ACLCommand::LIST,
    )
  end
end

#notify_observer(message) ⇒ Object

Notify the type observer for the given message

[View source]

224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
# File 'lib/messaging.rb', line 224

def notify_observer(message)
  if @uuid_observer.include? message.id
    observer = @uuid_observer[message.id]
    message.validate!(observer[:original_message]) if observer.include?(:original_message)
    Thread.new do
      @uuid_observer[message.id][:block].call(message)
      @uuid_observer.delete(message.id)
    end
    return
  end

  # Return if there is no observer setup for this kind of message
  return unless @type_observer.include? message.typ

  Thread.new do
    @type_observer[message.typ][:block].call(message)
  end
end

#remove_acl_rule(payload) ⇒ Object

Blocks incoming messages from specified identities

[View source]

122
123
124
125
126
127
128
129
# File 'lib/messaging.rb', line 122

def remove_acl_rule(payload)
  send_message Msgproto::AccessControlList.new(
    type: Msgproto::MsgType::ACL,
    id: SecureRandom.uuid,
    command: Msgproto::ACLCommand::REVOKE,
    payload: payload,
  )
end

#send_and_wait_for_response(msg, original) ⇒ Object

Sends a message and waits for the response

[View source]

145
146
147
148
149
# File 'lib/messaging.rb', line 145

def send_and_wait_for_response(msg, original)
  wait_for msg.id, original do
    send_message msg
  end
end

#send_custom(recipient, request_body) ⇒ Object

Send custom mmessage

Parameters:

  • recipient (string)

    selfID to be requested

  • type (string)

    message type

  • request (hash)

    original message requesing information

[View source]

96
97
98
99
100
101
102
103
104
105
# File 'lib/messaging.rb', line 96

def send_custom(recipient, request_body)
    @to_device = @client.devices(recipient).first
    send_message msg = Msgproto::Message.new(
      type: Msgproto::MsgType::MSG,
      id: SecureRandom.uuid,
      sender: "#{@jwt.id}:#{@device_id}",
      recipient: "#{recipient}:#{@to_device}",
      ciphertext: @jwt.prepare(request_body),
    )
end

#send_message(msg) ⇒ Object

Send a message through self network

[View source]

184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# File 'lib/messaging.rb', line 184

def send_message(msg)
  uuid = msg.id
  @mon.synchronize do
    @acks[uuid] = {
      waiting_cond: @mon.new_cond,
      waiting: true,
      timeout: SelfSDK::Time.now + @ack_timeout,
    }
  end
  send_raw(msg)
  SelfSDK.logger.info "waiting for acknowledgement #{uuid}"
  @mon.synchronize do
    @acks[uuid][:waiting_cond].wait_while do
      @acks[uuid][:waiting]
    end
  end
  SelfSDK.logger.info "acknowledged #{uuid}"
  true
ensure
  @acks.delete(uuid)
  false
end

#set_observer(original, options = {}, &block) ⇒ Object

[View source]

243
244
245
246
# File 'lib/messaging.rb', line 243

def set_observer(original, options = {}, &block)
  request_timeout = options.fetch(:timeout, @timeout)
  @uuid_observer[original.id] = { original_message: original, block: block, timeout: SelfSDK::Time.now + request_timeout }
end

#share_information(recipient, recipient_device, request) ⇒ Object

Responds a request information request

Parameters:

  • recipient (string)

    selfID to be requested

  • recipient_device (string)

    device id for the selfID to be requested

  • request (string)

    original message requesing information

[View source]

81
82
83
84
85
86
87
88
89
# File 'lib/messaging.rb', line 81

def share_information(recipient, recipient_device, request)
  send_message Msgproto::Message.new(
    type: Msgproto::MsgType::MSG,
    id: SecureRandom.uuid,
    sender: "#{@jwt.id}:#{@device_id}",
    recipient: "#{recipient}:#{recipient_device}",
    ciphertext: @jwt.prepare(request),
  )
end

#stopObject

[View source]

62
63
64
65
66
67
68
69
70
# File 'lib/messaging.rb', line 62

def stop
  @acks.each do |k, _v|
    mark_as_acknowledged(k)
  end
  @messages.each do |k, _v|
    mark_as_acknowledged(k)
    mark_as_arrived(k)
  end
end

#subscribe(type, &block) ⇒ Object

[View source]

248
249
250
251
# File 'lib/messaging.rb', line 248

def subscribe(type, &block)
  type = SelfSDK::message_type(type) if type.is_a? Symbol
  @type_observer[type] = { block: block }
end

#wait_for(uuid, msg = nil) ⇒ Object

Executes the given block and waits for the message id specified on the uuid.

[View source]

155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
# File 'lib/messaging.rb', line 155

def wait_for(uuid, msg = nil)
  SelfSDK.logger.info "sending #{uuid}"
  @mon.synchronize do
    @messages[uuid] = {
      waiting_cond: @mon.new_cond,
      waiting: true,
      timeout: SelfSDK::Time.now + @timeout,
      original_message: msg,
    }
  end

  yield

  SelfSDK.logger.info "waiting for client to respond #{uuid}"
  @mon.synchronize do
    @messages[uuid][:waiting_cond].wait_while do
      @messages[uuid][:waiting]
    end
  end

  SelfSDK.logger.info "response received for #{uuid}"
  @messages[uuid][:response]
ensure
  @messages.delete(uuid)
end