Class: OmfEc::Experiment

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin, Singleton
Defined in:
lib/omf_ec/experiment.rb

Overview

Experiment class to hold relevant state information

Defined Under Namespace

Classes: MetaData

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initializeExperiment

Returns a new instance of Experiment.



32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# File 'lib/omf_ec/experiment.rb', line 32

def initialize
  super
  @id = Time.now.utc.iso8601(3)
  @sliceID = nil
  @state ||= [] #TODO: we need to keep history of all the events and not ovewrite them
  @groups ||= []
  @nodes ||= []
  @events ||= []
  @app_definitions ||= Hash.new
  @sub_groups ||= []
  @cmdline_properties ||= Hash.new
  @show_graph = false
  @js_url = nil
  @job_url = nil
  @job_mps = {}
  @ss_url = nil
end

Instance Attribute Details

#app_definitionsObject

Returns the value of attribute app_definitions.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def app_definitions
  @app_definitions
end

#cmdline_propertiesObject

Returns the value of attribute cmdline_properties.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def cmdline_properties
  @cmdline_properties
end

#groupsObject (readonly)

Returns the value of attribute groups.



20
21
22
# File 'lib/omf_ec/experiment.rb', line 20

def groups
  @groups
end

#job_mpsObject

Returns the value of attribute job_mps.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def job_mps
  @job_mps
end

#job_urlObject

Returns the value of attribute job_url.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def job_url
  @job_url
end

#js_urlObject

Returns the value of attribute js_url.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def js_url
  @js_url
end

#nameObject

Returns the value of attribute name.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def name
  @name
end

#nodesObject

Returns the value of attribute nodes.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def nodes
  @nodes
end

#oml_uriObject

Returns the value of attribute oml_uri.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def oml_uri
  @oml_uri
end

#propertyObject

Returns the value of attribute property.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def property
  @property
end

#show_graphObject

Returns the value of attribute show_graph.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def show_graph
  @show_graph
end

#sliceIDObject

Returns the value of attribute sliceID.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def sliceID
  @sliceID
end

#ss_urlObject

Returns the value of attribute ss_url.



19
20
21
# File 'lib/omf_ec/experiment.rb', line 19

def ss_url
  @ss_url
end

#stateObject (readonly)

Returns the value of attribute state.



20
21
22
# File 'lib/omf_ec/experiment.rb', line 20

def state
  @state
end

#sub_groupsObject (readonly)

Returns the value of attribute sub_groups.



20
21
22
# File 'lib/omf_ec/experiment.rb', line 20

def sub_groups
  @sub_groups
end

Class Method Details

.disconnectObject



294
295
296
297
298
299
300
301
# File 'lib/omf_ec/experiment.rb', line 294

def disconnect
  info "Disconnecting in 5 sec from experiment: #{OmfEc.experiment.id}"
  info "Run the EC again to reattach"
  OmfCommon.el.after(5) do
    OmfCommon.comm.disconnect
    OmfCommon.eventloop.stop
  end
end

.doneObject

Disconnect communicator, try to delete any XMPP affiliations



267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# File 'lib/omf_ec/experiment.rb', line 267

def done
  info "Experiment: #{OmfEc.experiment.id} finished"
  info "Release applications and network interfaces"
  info "Exit in 15 seconds..."

  # Make sure that all defined events are removed
  OmfEc.experiment.clear_events

  OmfCommon.el.after(10) do
    allGroups do |g|
      g.resources[type: 'application'].release unless g.app_contexts.empty?
      g.resources[type: 'net'].release unless g.net_ifs.find_all { |v| v.conf[:type] == 'net' }.empty?
      g.resources[type: 'wlan'].release unless g.net_ifs.find_all { |v| v.conf[:type] == 'wlan' }.empty?
      g.resources.membership = { leave: g.address }
    end

    OmfCommon.el.after(4) do
      info "OMF Experiment Controller #{OmfEc::VERSION} - Exit."
      OmfCommon.el.after(1) do
        OmfCommon.comm.disconnect
        OmfCommon.eventloop.stop
      end
    end
  end
  OmfEc.experiment.("state", "finished")
end

.IDObject

Unique experiment id (Class method)



176
177
178
# File 'lib/omf_ec/experiment.rb', line 176

def self.ID
  instance.id
end

.leave_membershipsObject

Ask the resources which joined the groups I created to leave



323
324
325
326
327
# File 'lib/omf_ec/experiment.rb', line 323

def leave_memberships
  all_groups do |g|
    g.resources.membership = { leave: g.address }
  end
end

.sliceIDObject

Unique slice id (Class method)



181
182
183
# File 'lib/omf_ec/experiment.rb', line 181

def self.sliceID
  instance.sliceID
end

.startObject



303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
# File 'lib/omf_ec/experiment.rb', line 303

def start
  info "Experiment: #{OmfEc.experiment.id} starts"
  info "Slice: #{OmfEc.experiment.sliceID}" unless OmfEc.experiment.sliceID.nil?
  OmfEc.experiment.("state", "running")

  allGroups do |g|
    g.members.each do |key, value|
      OmfEc.subscribe_and_monitor(key) do |res|
        info "Configure '#{key}' to join '#{g.name}'"
        g.synchronize do
          g.members[key] = res.address
        end
        res.configure(membership: g.address, :res_index => OmfEc.experiment.nodes.index(key))
      end
    end
  end
end

Instance Method Details

#add_event(name, opts, trigger) ⇒ Object



152
153
154
155
156
157
158
159
# File 'lib/omf_ec/experiment.rb', line 152

def add_event(name, opts, trigger)
  self.synchronize do
    warn "Event '#{name}' has already been defined. Overwriting it now." if event(name)
    @events.delete_if { |e| e[:name] == name }
    @events << { name: name, trigger: trigger, aliases: [] }.merge(opts)
    add_periodic_event(event(name)) if opts[:every]
  end
end

#add_group(group) ⇒ Object



129
130
131
132
133
134
# File 'lib/omf_ec/experiment.rb', line 129

def add_group(group)
  self.synchronize do
    raise ArgumentError, "Expect Group object, got #{group.inspect}" unless group.kind_of? OmfEc::Group
    @groups << group unless group(group.name)
  end
end

#add_or_update_resource_state(name, opts = {}) ⇒ Object Also known as: add_resource



70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/omf_ec/experiment.rb', line 70

def add_or_update_resource_state(name, opts = {})
  self.synchronize do
    res = resource_state(name)
    if res
      opts.each do |key, value|
        if value.class == Array
          # Merge array values
          res[key] ||= []
          res[key] += value
          res[key].uniq!
        elsif value.kind_of? Hash
          # Merge hash values
          res[key] ||= Hashie::Mash.new
          res[key].merge!(value)
        else
          # Overwrite otherwise
          res[key] = value
        end
      end
    else
      info "Newly discovered resource >> #{name}"
      res = Hashie::Mash.new({ address: name }).merge(opts)
      @state << res

      # Re send membership configure
      planned_groups = groups_by_res(res[:address])

      unless planned_groups.empty?
        OmfEc.subscribe_and_monitor(name) do |res|
          info "Config #{name} to join #{planned_groups.map(&:name).join(', ')}"
          res.configure(membership: planned_groups.map(&:address))
        end
      end
    end
  end
end

#add_periodic_event(event) ⇒ Object



194
195
196
197
198
199
200
# File 'lib/omf_ec/experiment.rb', line 194

def add_periodic_event(event)
  event[:periodic_timer] = OmfCommon.el.every(event[:every]) do
    self.synchronize do
      eval_trigger(event)
    end
  end
end

#add_property(name, value = nil, description = nil) ⇒ Object



54
55
56
57
58
# File 'lib/omf_ec/experiment.rb', line 54

def add_property(name, value = nil, description = nil)
  override_value = @cmdline_properties[name.to_s.to_sym]
  value = override_value unless override_value.nil?
  ExperimentProperty.create(name, value, description)
end

#add_sub_group(name) ⇒ Object



119
120
121
122
123
# File 'lib/omf_ec/experiment.rb', line 119

def add_sub_group(name)
  self.synchronize do
    @sub_groups << name unless @sub_groups.include?(name)
  end
end

#all_groups?(&block) ⇒ Boolean

Returns:

  • (Boolean)


144
145
146
# File 'lib/omf_ec/experiment.rb', line 144

def all_groups?(&block)
  !groups.empty? && groups.all? { |g| block ? block.call(g) : g }
end

#archive_oedl(script_name) ⇒ Object

Archive OEDL content to OML db



232
233
234
235
236
237
238
# File 'lib/omf_ec/experiment.rb', line 232

def archive_oedl(script_name)
  (
    script_name,
    Base64.encode64(Zlib::Deflate.deflate(File.read(script_name))),
    "oedl_content"
  )
end

#clear_eventsObject



161
162
163
164
165
166
167
168
# File 'lib/omf_ec/experiment.rb', line 161

def clear_events
  self.synchronize do
    @events.each do |e|
      e[:periodic_timer].cancel if e[:periodic_timer]
    end
    @events = []
  end
end

#create_jobObject

If EC is launched with –job-service setup, then it needs to create a job entry for this experiment trial Do nothing if:

  • a JobService URL has not been provided, i.e. EC runs without needs to contact JS

  • we already have a Job URL, i.e. the job entry has already been created



245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
# File 'lib/omf_ec/experiment.rb', line 245

def create_job
  return unless @job_url.nil?
  return if @js_url.nil?
  require 'json'
  require 'net/http'
  begin
    job = { name: self.id }
    u = URI.parse(@js_url+'/jobs')
    req = Net::HTTP::Post.new(u.path, {'Content-Type' =>'application/json'})
    req.body = JSON.pretty_generate(job)
    res = Net::HTTP.new(u.host, u.port).start {|http| http.request(req) }
    raise "Could not create a job for this experiment trial\n"+
          "Response #{res.code} #{res.message}:\n#{res.body}" unless res.kind_of? Net::HTTPSuccess
    job = JSON.parse(res.body)
    raise "No valid URL received for the created job for this experiment trial" if job['href'].nil?
    @job_url = job['href']
  end
end

#each_group(&block) ⇒ Object



136
137
138
139
140
141
142
# File 'lib/omf_ec/experiment.rb', line 136

def each_group(&block)
  if block
    groups.each { |g| block.call(g) }
  else
    groups
  end
end

#eval_trigger(event) ⇒ Object



202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
# File 'lib/omf_ec/experiment.rb', line 202

def eval_trigger(event)
  if event[:callbacks] && !event[:callbacks].empty? && event[:trigger].call(@state)
    # Periodic check event
    event[:periodic_timer].cancel if event[:periodic_timer] && event[:consume_event]

    @events.delete(event) if event[:consume_event]
    event_names = ([event[:name]] + event[:aliases]).join(', ')
    info "Event triggered: '#{event_names}'"

    # Last in first serve callbacks
    event[:callbacks].reverse.each do |callback|
      callback.call
    end
  end
end

#event(name) ⇒ Object



148
149
150
# File 'lib/omf_ec/experiment.rb', line 148

def event(name)
  @events.find { |v| v[:name] == name || v[:aliases].include?(name) }
end

#group(name) ⇒ Object



125
126
127
# File 'lib/omf_ec/experiment.rb', line 125

def group(name)
  groups.find { |v| v.name == name }
end

#groups_by_res(res_addr) ⇒ Object

Find all groups a given resource belongs to



111
112
113
# File 'lib/omf_ec/experiment.rb', line 111

def groups_by_res(res_addr)
  groups.find_all { |g| g.members.values.include?(res_addr) }
end

#idObject

Unique experiment id



171
172
173
# File 'lib/omf_ec/experiment.rb', line 171

def id
  @name || @id
end

#log_metadata(key, value, domain = 'sys') ⇒ Object



226
227
228
229
# File 'lib/omf_ec/experiment.rb', line 226

def (key, value, domain = 'sys')
  #MetaData.inject_metadata(key.to_s, value.to_s)
  MetaData.inject(domain.to_s, key.to_s, value.to_s)
end

#mp_table_namesObject



218
219
220
221
222
223
224
# File 'lib/omf_ec/experiment.rb', line 218

def mp_table_names
  {}.tap do |m_t_n|
    groups.map(&:app_contexts).flatten.map(&:mp_table_names).each do |v|
      m_t_n.merge!(v)
    end
  end
end

#process_eventsObject

Parsing user defined events, checking conditions against internal state, and execute callbacks if triggered



186
187
188
189
190
191
192
# File 'lib/omf_ec/experiment.rb', line 186

def process_events
  self.synchronize do
    @events.find_all { |v| v[:every].nil? }.each do |event|
      eval_trigger(event)
    end
  end
end

#resource_by_hrn(hrn) ⇒ Object



66
67
68
# File 'lib/omf_ec/experiment.rb', line 66

def resource_by_hrn(hrn)
  @state.find { |v| v[:hrn].to_s == hrn.to_s }
end

#resource_state(address) ⇒ Object Also known as: resource



60
61
62
# File 'lib/omf_ec/experiment.rb', line 60

def resource_state(address)
  @state.find { |v| v[:address].to_s == address.to_s }
end

#sub_group(name) ⇒ Object



115
116
117
# File 'lib/omf_ec/experiment.rb', line 115

def sub_group(name)
  @sub_groups.find { |v| v == name }
end