Class: Observation
- Inherits:
-
Object
- Object
- Observation
- Defined in:
- lib/MINT-core/mapping/observation/observation.rb
Direct Known Subclasses
Instance Method Summary collapse
- #call_subscribed_callbacks ⇒ Object
- #check_true_at_startup(cb) ⇒ Object
- #element ⇒ Object
- #fail(cb) ⇒ Object
- #id ⇒ Object
-
#initialize(parameters) ⇒ Observation
constructor
A new instance of Observation.
- #is_continuous? ⇒ Boolean
- #is_instant? ⇒ Boolean
- #is_onchange? ⇒ Boolean
-
#is_subscribed_callback(&block) ⇒ Object
This callback is used to inform that the observation has been successfully subscribed.
- #name ⇒ Object
- #result(r) ⇒ Object
- #resultName ⇒ Object
- #start(observations_results, cb) ⇒ Object
- #states ⇒ Object
- #stop ⇒ Object
Constructor Details
#initialize(parameters) ⇒ Observation
Returns a new instance of Observation.
3 4 5 6 7 8 9 10 11 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 3 def initialize(parameters) @observation = parameters # state of subscription, to ensure that an observation only subscribes once @cb_observation_has_subscribed = false # defines if the ongoing subscription should act or has been temporary disabled using stop @should_listen = false end |
Instance Method Details
#call_subscribed_callbacks ⇒ Object
211 212 213 214 215 216 217 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 211 def call_subscribed_callbacks @subscribed_callbacks ||= [] while cb = @subscribed_callbacks.pop cb.call(self) end @subscribed_callbacks.clear if @subscribed_callbacks end |
#check_true_at_startup(cb) ⇒ Object
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 69 def check_true_at_startup(cb) # check if observation is already true at startup model = MINT::Interactor.class_from_channel_name(element) results = nil if (name) # if a name is specified, query directly otherwise select by state and return the first one found results = [] r =model.get(model.getModel,name) return false if r.nil? # handles the case that the named interactor does not exist! results << r else results = model.all end # if no states variable is set, there is no need to filter states if states and states.length >0 and results.length > 0 r = results.find_all { |e| ((e.states.map(&:to_s) & states).length>0) or ((e.abstract_states.split('|') & states).length>0) } else r = results end if r.length > 0 if r.length ==1 cb.call element, true, result(MultiJson.decode r[0].to_json(:only => r[0].class::PUBLISH_ATTRIBUTES)),id else res = [] r.each do |e| res << MultiJson.decode(e.to_json(:only => e.class::PUBLISH_ATTRIBUTES)) end cb.call element, true, result(res),id end return true else return false end end |
#element ⇒ Object
30 31 32 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 30 def element @observation[:element] end |
#fail(cb) ⇒ Object
118 119 120 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 118 def fail(cb) cb.call element, :fail , nil, id end |
#id ⇒ Object
26 27 28 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 26 def id @observation[:id] end |
#is_continuous? ⇒ Boolean
13 14 15 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 13 def is_continuous? @observation[:process].nil? or @observation[:process] == :continuous end |
#is_instant? ⇒ Boolean
17 18 19 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 17 def is_instant? @observation[:process] == :instant end |
#is_onchange? ⇒ Boolean
21 22 23 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 21 def is_onchange? @observation[:process] == :onchange end |
#is_subscribed_callback(&block) ⇒ Object
This callback is used to inform that the observation has been successfully subscribed
200 201 202 203 204 205 206 207 208 209 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 200 def is_subscribed_callback(&block) return unless block if @cb_observation_has_subscribed block.call(self) else @subscribed_callbacks ||= [] @subscribed_callbacks.unshift block # << block end end |
#name ⇒ Object
50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 50 def name if @observation[:name] =~ /\./ # check if refers to variable used in sync mappings selector = @observation[:name].split "." variable = selector.shift attributes = selector if (@observation_results and @observation_results.has_key? variable) interactor_data= @observation_results[variable] interactor = MINT::Interactor.get(interactor_data["mint_model"],interactor_data["name"]) attributes.each do |attr| interactor = interactor.method(attr).call end return interactor end else return @observation[:name] end end |
#result(r) ⇒ Object
42 43 44 45 46 47 48 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 42 def result(r) if resultName and r {resultName => r } else {} end end |
#resultName ⇒ Object
38 39 40 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 38 def resultName @observation[:result] end |
#start(observations_results, cb) ⇒ Object
122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 122 def start(observations_results,cb) @observation_results=observations_results if @cb_observation_has_subscribed # restart!! @should_listen = true if is_instant? fail(cb) if not check_true_at_startup(cb) elsif is_continuous? check_true_at_startup(cb) end return self end @proc_observation_wo_name = Proc.new { |key, | if @should_listen found=MultiJson.decode if name.nil? or name.eql? found["name"] if found.has_key? "new_states" if (found["new_states"] & states).length>0 # checks if both arrays share at least one element cb.call element, true , result(found),id else if (found["states"] & states).length == 0 cb.call element, false, {},id end end end end end } @proc_observation = Proc.new { || if @should_listen found=MultiJson.decode if found.has_key? "new_states" if (found["new_states"] & states).length>0 # checks if both arrays share at least one element cb.call element, true , result(found),id else if (found["states"] & states).length == 0 cb.call element, false, {},id end end end end } res = false res = check_true_at_startup(cb) if is_continuous? or is_instant? if not res and is_instant? # instant checks do not require a subscribe and directly fail if false fail(cb) return self end if not @cb_observation_has_subscribed @should_listen = true redis = RedisConnector.redis if (name) redis.pubsub.subscribe("#{element}.#{name}",@proc_observation).callback { |count| @cb_observation_has_subscribed = true call_subscribed_callbacks } else redis.pubsub.psubscribe("#{element}*",@proc_observation_wo_name).callback { |count| @cb_observation_has_subscribed = true call_subscribed_callbacks } end end self end |
#states ⇒ Object
34 35 36 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 34 def states @observation[:states].map &:to_s end |
#stop ⇒ Object
111 112 113 114 115 116 |
# File 'lib/MINT-core/mapping/observation/observation.rb', line 111 def stop # RedisConnector.redis.pubsub.unsubscribe_proc("#{element}",@proc_observation) @should_listen = false #@cb_observation_has_subscribed = false @subscribed_callbacks = [] end |