Class: TacScribe::Daemon

Inherits:
Object
  • Object
show all
Defined in:
lib/tac_scribe/daemon.rb

Overview

Main entry-point into Tac Scribe. Instantiate this class to start processing events.

Instance Method Summary collapse

Constructor Details

#initialize(db_host:, db_port:, db_name:, db_user:, db_password:, tacview_host:, tacview_port:, tacview_password:, tacview_client_name:, verbose_logging:, thread_count:, populate_airfields:, whitelist: nil, cinc_enabled:, cinc_port:) ⇒ Daemon



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# File 'lib/tac_scribe/daemon.rb', line 16

def initialize(db_host:, db_port:, db_name:, db_user:, db_password:,
               tacview_host:, tacview_port:, tacview_password:,
               tacview_client_name:, verbose_logging:, thread_count:,
               populate_airfields:, whitelist: nil, cinc_enabled:, cinc_port:)
  Datastore.instance.configure do |config|
    config.host = db_host
    config.port = db_port
    config.database = db_name
    config.username = db_user
    config.password = db_password
  end
  Datastore.instance.connect

  @event_queue = EventQueue.new

  @verbose_logging = verbose_logging

  @populate_airfields = populate_airfields
  @thread_count = thread_count
  @threads = {}
  @whitelist = Set.new(IO.read(whitelist).split) if whitelist

  @tacview_client = TacviewClient::Client.new(
    host: tacview_host,
    port: tacview_port,
    password: tacview_password,
    processor: @event_queue,
    client_name: tacview_client_name
  )

  if cinc_enabled
    @cinc_client = Cinc::Client.new(
      host: tacview_host,
      port: cinc_port
    )
  end
end

Instance Method Details

#kill_threadsObject



94
95
96
97
98
99
100
101
# File 'lib/tac_scribe/daemon.rb', line 94

def kill_threads
  puts 'Killing Threads'
  @threads.each_pair do |key, thread|
    puts "Killing #{key} thread"
    thread.kill
    thread.join
  end
end

#populate_airfieldsObject



178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
# File 'lib/tac_scribe/daemon.rb', line 178

def populate_airfields
  json = File.read(File.join(File.dirname(__FILE__),
                             '../../data/airfields.json'))
  airfields = JSON.parse(json)
  airfields.each_with_index do |airfield, i|
    @event_queue.update_object(
      object_id: (45_000_000 + i).to_s,
      latitude: BigDecimal(airfield['lat'].to_s),
      longitude: BigDecimal(airfield['lon'].to_s),
      altitude: BigDecimal(airfield['alt'].to_s),
      type: 'Ground+Static+Aerodrome',
      name: airfield['name'],
      coalition: 0 # Neutral
    )
  end
end

#start_cinc_threadObject



115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# File 'lib/tac_scribe/daemon.rb', line 115

def start_cinc_thread
  return unless @cinc_client
  cinc_thread = Thread.new do
    loop do
      @cinc_client.connect
      while true
        @cinc_client.get_airbases.each do |airbase|
          @event_queue.update_object Hash[airbase.map{ |k, v| [k.to_sym, v] }]
        end
        sleep 60
      end
      rescue StandardError => e
        puts 'Exception in cinc thread'
        puts e.message
        puts e.backtrace
        sleep 30
        next
    end
  end

  cinc_thread.name = 'Cinc Processor'
  @threads[:cinc] = cinc_thread
end

#start_db_sync_threadObject



139
140
141
142
143
144
145
146
147
148
149
150
151
# File 'lib/tac_scribe/daemon.rb', line 139

def start_db_sync_thread
  db_write_thread = Thread.new do
    loop do
      sleep 1
      deleted = Datastore.instance.write_objects(Cache.instance.data.values)
      deleted.each { |id| Cache.instance.data.delete(id) }
    rescue StandardError
      next
    end
  end
  db_write_thread.name = 'Database Writing'
  @threads[:database] = db_write_thread
end

#start_processingObject

Starts processing and reconnects if the client was disconnected. Because connecting to Tacview always gives an initial unit dump we truncate the table each time we reconnect. This will make sure there are no ghost units hanging around after server restart for example



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/tac_scribe/daemon.rb', line 59

def start_processing
  loop do
    puts 'Starting processing loop'
    @event_queue.clear
    Datastore.instance.truncate_table
    Cache.instance.clear
    start_processing_threads
    start_cinc_thread
    start_db_sync_thread
    start_reporting_thread
    populate_airfields if @populate_airfields
    @threads.each_pair do |key, _value|
      puts "#{key} thread started"
    end
    @tacview_client.connect
    # If this code is executed it means we have been disconnected without
    # exceptions. We will still sleep to stop a retry storm. Just not as
    # long.
    puts "Disconnected from Tacview"
    kill_threads
    sleep 10
  # Rescuing reliably from Net::HTTP is a complete bear so rescue
  # StandardError. It ain't pretty but it is reliable. We will puts
  # the exception just in case
  # https://stackoverflow.com/questions/5370697/what-s-the-best-way-to-handle-exceptions-from-nethttp
  rescue StandardError => e
    puts 'Exception in processing loop'
    puts e.message
    puts e.backtrace
    kill_threads
    sleep 30
    next
  end
end

#start_processing_threadsObject



103
104
105
106
107
108
109
110
111
112
113
# File 'lib/tac_scribe/daemon.rb', line 103

def start_processing_threads
  @event_processor = EventProcessor.new(cache: Cache.instance,
                                        datastore: Datastore.instance,
                                        event_queue: @event_queue,
                                        whitelist: @whitelist)
  event_processor_thread = Thread.new do
    @event_processor.start
  end
  event_processor_thread.name = 'Event Processor'
  @threads[:processing] = event_processor_thread
end

#start_reporting_threadObject



153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# File 'lib/tac_scribe/daemon.rb', line 153

def start_reporting_thread
  return unless @verbose_logging

  reporting_thread = Thread.new do
    sleep 5
    loop do
      puts "#{Time.now.strftime('%FT%T')}\t" \
           "Events Incoming: #{@event_queue.events_written}\t" \
           "Processed: #{@event_processor.events_processed}\t" \
           "Ignored: #{@event_processor.events_ignored}\t" \
           "Queue Size: #{@event_queue.size}\t" \
           "Objects Written: #{Datastore.instance.written}\t" \
           "Deleted: #{Datastore.instance.deleted}"
      @event_queue.events_written = 0
      @event_processor.events_processed = 0
      @event_processor.events_ignored = 0
      Datastore.instance.written = 0
      Datastore.instance.deleted = 0
      sleep 1
    end
  end
  reporting_thread.name = 'Reporting'
  @threads[:reporting] = reporting_thread
end