Class: Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/wattics-api-client/processor.rb

Instance Method Summary collapse

Constructor Details

#initialize(agent) ⇒ Processor

Returns a new instance of Processor.



5
6
7
8
9
10
11
12
13
# File 'lib/wattics-api-client/processor.rb', line 5

def initialize(agent)
  @agent = agent
  @measurements_with_config = PriorityBlockingQueue.new
  @semaphore = Concurrent::Semaphore.new(0)
  @is_sending = false
  @mutex = Mutex.new
  @logger = Logger.new(STDOUT)
  @logger.level = Logger::WARN
end

Instance Method Details

#is_idle?Boolean

Returns:

  • (Boolean)


24
25
26
27
28
# File 'lib/wattics-api-client/processor.rb', line 24

def is_idle?
  @mutex.synchronize do
    @measurements_with_config.is_empty? && !@is_sending
  end
end

#process(measurement_with_config) ⇒ Object



15
16
17
18
19
20
21
22
# File 'lib/wattics-api-client/processor.rb', line 15

def process(measurement_with_config)
  @measurements_with_config << measurement_with_config
  if measurement_with_config.is_a?(Array)
    @semaphore.release(measurement_with_config.size)
  else
    @semaphore.release
  end
end

#runObject



30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# File 'lib/wattics-api-client/processor.rb', line 30

def run
  client = ClientFactory.get_instance.create_client
  loop do
    @semaphore.acquire
    @mutex.synchronize do
      @measurement_with_config = @measurements_with_config.pop
      @is_sending = true
    end
    @measurement = @measurement_with_config.measurement
    @config = @measurement_with_config.config
    loop do
      begin
        @response = client.send(@measurement, @config)
        if !@agent.nil? && @response.code < 400
          @agent.report_sent_measurement(@measurement, @response)
        end

        if !@agent.nil? && @response.code >= 400
          @agent.report_sent_measurement(@measurement, @response)
          if defined?(Rails).nil?
            @logger.error("Could not send #{@measurement}, Server Response: #{@response.body}")
          else
            Rails.logger.error("Could not send #{@measurement}, Server Response: #{@response.body}")
            puts "Could not send #{@measurement}, Server Response: #{@response.body}"
          end

        end
        break
      rescue StandardError => e

        if defined?(Rails).nil?
          @logger.error("Could not send #{@measurement}, Error: #{e}")
        else
          Rails.logger.error("Could not send #{@measurement}, Error: #{e}")
          puts "Could not send #{@measurement}, Error: #{e}"
        end

        sleep 60
      end
    end
    @mutex.synchronize do
      @is_sending = false
    end
  end
rescue StandardError => e
  if defined?(Rails).nil?
    @logger.error("Thread stopped unexpectedly: #{e.message}")
  else
    Rails.logger.error("Thread stopped unexpectedly: #{e.message}")
    puts "Thread stopped unexpectedly: #{e.message}"
  end
end