Class: Agent

Inherits:
Object
  • Object
show all
Defined in:
lib/wattics-api-client/agent.rb

Constant Summary collapse

@@mutex =
Mutex.new

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(maximum_parallel_senders = 0) ⇒ Agent

Returns a new instance of Agent.



6
7
8
9
10
11
12
13
14
15
# File 'lib/wattics-api-client/agent.rb', line 6

def initialize(maximum_parallel_senders = 0)
  @agent_thread_group = ThreadGroup.new
  @processor_pool = ProcessorPool.new(self, @agent_thread_group, maximum_parallel_senders)
  @enqueued_measurements_with_config = Hash.new { |h, k| h[k] = [] }
  @sent_measurements_with_context = BlockingQueue.new
  @measurement_sent_handler_list = Concurrent::Array.new
  start_processor_feeder
  start_measurement_sent_handler_dispatcher
  @wait_semaphore = Concurrent::Semaphore.new(0)
end

Instance Attribute Details

#threadObject (readonly)

Returns the value of attribute thread.



5
6
7
# File 'lib/wattics-api-client/agent.rb', line 5

def thread
  @thread
end

Class Method Details

.disposeObject



24
25
26
27
28
29
30
31
# File 'lib/wattics-api-client/agent.rb', line 24

def self.dispose
  @@mutex.synchronize do
    unless @@instance.nil?
      @@instance.agent_thread_group.list.each(&:kill)
      @@instance = nil
    end
  end
end

.get_instance(maximum_parallel_senders = 0) ⇒ Object



18
19
20
21
22
# File 'lib/wattics-api-client/agent.rb', line 18

def self.get_instance(maximum_parallel_senders = 0)
  @@mutex.synchronize do
    @@instance ||= new(maximum_parallel_senders)
  end
end

Instance Method Details

#add_measurement_sent_handlerObject



111
112
113
# File 'lib/wattics-api-client/agent.rb', line 111

def add_measurement_sent_handler
  @measurement_sent_handler_list << yield
end

#report_sent_measurement(measurement, response) ⇒ Object



105
106
107
108
109
# File 'lib/wattics-api-client/agent.rb', line 105

def report_sent_measurement(measurement, response)
  #Dont need to send to the measurement handler, <=400 erros are reported on the processor
  @sent_measurements_with_context << [measurement, response] if response.code <= 400
  @wait_semaphore.acquire
end

#send(measurement, config) ⇒ Object



80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# File 'lib/wattics-api-client/agent.rb', line 80

def send(measurement, config)
  if measurement.is_a?(Array)
    @wait_semaphore.release(measurement.size)
    measurement_groups = measurement.group_by(&:id)
    measurement_groups.each do |channel_id, measurements_for_channel_id|
      measurements_with_config = measurements_for_channel_id.map { |measurement| MeasurementWithConfig.new(measurement, config) }
      @processor_already_bound_to_channel_id = @processor_pool.get_processor(channel_id)
      if @processor_already_bound_to_channel_id.nil?
        @enqueued_measurements_with_config[channel_id] += measurements_with_config
      else
        @processor_already_bound_to_channel_id.process(measurements_with_config)
      end
    end
  else
    @wait_semaphore.release
    measurement_with_config = MeasurementWithConfig.new(measurement, config)
    @processor_already_bound_to_channel_id = @processor_pool.get_processor(measurement.id)
    if @processor_already_bound_to_channel_id.nil?
      @enqueued_measurements_with_config[measurement.id] << measurement_with_config
    else
      @processor_already_bound_to_channel_id.process(measurement_with_config)
    end
  end
end

#sleep_fixObject



76
77
78
# File 'lib/wattics-api-client/agent.rb', line 76

def sleep_fix
  sleep 0.1
end

#start_measurement_sent_handler_dispatcherObject



61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/wattics-api-client/agent.rb', line 61

def start_measurement_sent_handler_dispatcher
  @agent_thread_group.add(Thread.new do
    begin
      loop do
        array = @sent_measurements_with_context.pop
        next if array.nil?
        measurement = array[0]
        response = array[1]
        @measurement_sent_handler_list.each { |handler| handler.call(measurement, response) }
      end
    rescue ThreadError
    end
  end)
end

#start_processor_feederObject



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# File 'lib/wattics-api-client/agent.rb', line 39

def start_processor_feeder
  @agent_thread_group.add(Thread.new do
    begin
      loop do
        key, values = @enqueued_measurements_with_config.first
        if @enqueued_measurements_with_config.empty?
          sleep_fix
          next
        end
        processor = @processor_pool.get_processor(key)
        if processor.nil?
          sleep_fix
          next
        end
        @enqueued_measurements_with_config.delete(key)
        processor.process(values)
      end
    rescue ThreadError
    end
  end)
end

#wait_until_lastObject



33
34
35
36
37
# File 'lib/wattics-api-client/agent.rb', line 33

def wait_until_last
  Thread.new do
    sleep 0.01 while @wait_semaphore.available_permits != 0
  end.join
end