Class: SelfSDK::MessagingClient

Inherits:
Object
  • Object
show all
Defined in:
lib/messaging.rb

Constant Summary collapse

DEFAULT_DEVICE =
"1"
DEFAULT_AUTO_RECONNECT =
true
DEFAULT_STORAGE_DIR =
"./.self_storage"
ON_DEMAND_CLOSE_CODE =
3999

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(url, client, storage_key, options = {}) ⇒ MessagingClient

RestClient initializer

Parameters:

  • url (string)

    self-messaging url

  • opts (Hash)

    a customizable set of options

[View source]

35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# File 'lib/messaging.rb', line 35

def initialize(url, client, storage_key, options = {})
  @mon = Monitor.new
  @url = url
  @messages = {}
  @acks = {}
  @type_observer = {}
  @uuid_observer = {}
  @jwt = client.jwt
  @client = client
  @ack_timeout = 60 # seconds
  @timeout = 120 # seconds
  @auth_id = SecureRandom.uuid
  @device_id = options.fetch(:device_id, DEFAULT_DEVICE)
  @auto_reconnect = options.fetch(:auto_reconnect, DEFAULT_AUTO_RECONNECT)
  @raw_storage_dir = options.fetch(:storage_dir, DEFAULT_STORAGE_DIR)
  @storage_dir = "#{@raw_storage_dir}/apps/#{@jwt.id}/devices/#{@device_id}"
  FileUtils.mkdir_p @storage_dir unless File.exist? @storage_dir
  @offset_file = "#{@storage_dir}/#{@jwt.id}:#{@device_id}.offset"
  @offset = read_offset
  migrate_old_storage_format

  unless options.include? :no_crypto
    crypto_path = "#{@storage_dir}/keys"
    FileUtils.mkdir_p crypto_path unless File.exist? crypto_path
    @encryption_client = Crypto.new(@client, @device_id, crypto_path, storage_key)
  end

  if options.include? :ws
    @ws = options[:ws]
  else
    start
  end
end

Instance Attribute Details

#ack_timeoutObject

Returns the value of attribute ack_timeout.


24
25
26
# File 'lib/messaging.rb', line 24

def ack_timeout
  @ack_timeout
end

#clientObject

Returns the value of attribute client.


24
25
26
# File 'lib/messaging.rb', line 24

def client
  @client
end

#device_idObject

Returns the value of attribute device_id.


24
25
26
# File 'lib/messaging.rb', line 24

def device_id
  @device_id
end

#encryption_clientObject

Returns the value of attribute encryption_client.


24
25
26
# File 'lib/messaging.rb', line 24

def encryption_client
  @encryption_client
end

#jwtObject

Returns the value of attribute jwt.


24
25
26
# File 'lib/messaging.rb', line 24

def jwt
  @jwt
end

#timeoutObject

Returns the value of attribute timeout.


24
25
26
# File 'lib/messaging.rb', line 24

def timeout
  @timeout
end

#type_observerObject

Returns the value of attribute type_observer.


24
25
26
# File 'lib/messaging.rb', line 24

def type_observer
  @type_observer
end

#uuid_observerObject

Returns the value of attribute uuid_observer.


24
25
26
# File 'lib/messaging.rb', line 24

def uuid_observer
  @uuid_observer
end

Instance Method Details

#add_acl_rule(payload) ⇒ Object

Allows incomming messages from the given identity

[View source]

130
131
132
133
134
135
136
137
# File 'lib/messaging.rb', line 130

def add_acl_rule(payload)
  send_message Msgproto::AccessControlList.new(
    type: Msgproto::MsgType::ACL,
    id: SecureRandom.uuid,
    command: Msgproto::ACLCommand::PERMIT,
    payload: payload,
  )
end

#clean_observersObject

[View source]

229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
# File 'lib/messaging.rb', line 229

def clean_observers
  live = {}
  @uuid_observer.clone.each do |id, msg|
    if msg[:timeout] < SelfSDK::Time.now
      message = SelfSDK::Messages::Base.new(self)
      message.status = "errored"

      @uuid_observer[id][:block].call(message)
      @uuid_observer.delete(id)
    else
      live[id] = msg
    end
  end
  @uuid_observer = live
end

#closeObject

[View source]

79
80
81
# File 'lib/messaging.rb', line 79

def close
  @ws.close(ON_DEMAND_CLOSE_CODE, "connection closed by the client")
end

#list_acl_rulesObject

Lists acl rules

[View source]

152
153
154
155
156
157
158
159
160
# File 'lib/messaging.rb', line 152

def list_acl_rules
  wait_for 'acl_list' do
    send_raw Msgproto::AccessControlList.new(
      type: Msgproto::MsgType::ACL,
      id: SecureRandom.uuid,
      command: Msgproto::ACLCommand::LIST,
    )
  end
end

#notify_observer(message) ⇒ Object

Notify the type observer for the given message

[View source]

246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
# File 'lib/messaging.rb', line 246

def notify_observer(message)
  if @uuid_observer.include? message.id
    observer = @uuid_observer[message.id]
    message.validate!(observer[:original_message]) if observer.include?(:original_message)
    Thread.new do
      @uuid_observer[message.id][:block].call(message)
      @uuid_observer.delete(message.id)
    end
    return
  end

  # Return if there is no observer setup for this kind of message
  return unless @type_observer.include? message.typ

  Thread.new do
    @type_observer[message.typ][:block].call(message)
  end
end

#remove_acl_rule(payload) ⇒ Object

Blocks incoming messages from specified identities

[View source]

142
143
144
145
146
147
148
149
# File 'lib/messaging.rb', line 142

def remove_acl_rule(payload)
  send_message Msgproto::AccessControlList.new(
    type: Msgproto::MsgType::ACL,
    id: SecureRandom.uuid,
    command: Msgproto::ACLCommand::REVOKE,
    payload: payload,
  )
end

#send_and_wait_for_response(msgs, original) ⇒ Object

Sends a message and waits for the response

[View source]

165
166
167
168
169
170
171
# File 'lib/messaging.rb', line 165

def send_and_wait_for_response(msgs, original)
  wait_for msgs.first.id, original do
    msgs.each do |msg|
      send_message msg
    end
  end
end

#send_custom(recipient, request_body) ⇒ Object

Send custom mmessage

Parameters:

  • recipient (string)

    selfID to be requested

  • type (string)

    message type

  • request (hash)

    original message requesing information

[View source]

103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/messaging.rb', line 103

def send_custom(recipient, request_body)
  @client.devices(recipient).each do |to_device|
    send_message Msgproto::Message.new(
      type: Msgproto::MsgType::MSG,
      id: SecureRandom.uuid,
      sender: "#{@jwt.id}:#{@device_id}",
      recipient: "#{recipient}:#{to_device}",
      ciphertext: @jwt.prepare(request_body),
    )
  end

  @client.devices(@jwt.id).each do |to_device|
    if to-device != @device_id 
      send_message Msgproto::Message.new(
        type: Msgproto::MsgType::MSG,
        id: SecureRandom.uuid,
        sender: "#{@jwt.id}:#{@device_id}",
        recipient: "#{recipient}:#{to_device}",
        ciphertext: @jwt.prepare(request_body),
      )
    end
  end
end

#send_message(msg) ⇒ Object

Send a message through self network

[View source]

206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
# File 'lib/messaging.rb', line 206

def send_message(msg)
  uuid = msg.id
  @mon.synchronize do
    @acks[uuid] = {
      waiting_cond: @mon.new_cond,
      waiting: true,
      timeout: SelfSDK::Time.now + @ack_timeout,
    }
  end
  send_raw(msg)
  SelfSDK.logger.info "waiting for acknowledgement #{uuid}"
  @mon.synchronize do
    @acks[uuid][:waiting_cond].wait_while do
      @acks[uuid][:waiting]
    end
  end
  SelfSDK.logger.info "acknowledged #{uuid}"
  true
ensure
  @acks.delete(uuid)
  false
end

#set_observer(original, options = {}, &block) ⇒ Object

[View source]

265
266
267
268
# File 'lib/messaging.rb', line 265

def set_observer(original, options = {}, &block)
  request_timeout = options.fetch(:timeout, @timeout)
  @uuid_observer[original.id] = { original_message: original, block: block, timeout: SelfSDK::Time.now + request_timeout }
end

#share_information(recipient, recipient_device, request) ⇒ Object

Responds a request information request

Parameters:

  • recipient (string)

    selfID to be requested

  • recipient_device (string)

    device id for the selfID to be requested

  • request (string)

    original message requesing information

[View source]

88
89
90
91
92
93
94
95
96
# File 'lib/messaging.rb', line 88

def share_information(recipient, recipient_device, request)
  send_message Msgproto::Message.new(
    type: Msgproto::MsgType::MSG,
    id: SecureRandom.uuid,
    sender: "#{@jwt.id}:#{@device_id}",
    recipient: "#{recipient}:#{recipient_device}",
    ciphertext: @jwt.prepare(request),
  )
end

#stopObject

[View source]

69
70
71
72
73
74
75
76
77
# File 'lib/messaging.rb', line 69

def stop
  @acks.each do |k, _v|
    mark_as_acknowledged(k)
  end
  @messages.each do |k, _v|
    mark_as_acknowledged(k)
    mark_as_arrived(k)
  end
end

#subscribe(type, &block) ⇒ Object

[View source]

270
271
272
273
# File 'lib/messaging.rb', line 270

def subscribe(type, &block)
  type = SelfSDK::message_type(type) if type.is_a? Symbol
  @type_observer[type] = { block: block }
end

#wait_for(uuid, msg = nil) ⇒ Object

Executes the given block and waits for the message id specified on the uuid.

[View source]

177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
# File 'lib/messaging.rb', line 177

def wait_for(uuid, msg = nil)
  SelfSDK.logger.info "sending #{uuid}"
  @mon.synchronize do
    @messages[uuid] = {
      waiting_cond: @mon.new_cond,
      waiting: true,
      timeout: SelfSDK::Time.now + @timeout,
      original_message: msg,
    }
  end

  yield

  SelfSDK.logger.info "waiting for client to respond #{uuid}"
  @mon.synchronize do
    @messages[uuid][:waiting_cond].wait_while do
      @messages[uuid][:waiting]
    end
  end

  SelfSDK.logger.info "response received for #{uuid}"
  @messages[uuid][:response]
ensure
  @messages.delete(uuid)
end