Class: TacScribe::Daemon
- Inherits:
-
Object
- Object
- TacScribe::Daemon
- Defined in:
- lib/tac_scribe/daemon.rb
Overview
Main entry-point into Tac Scribe. Instantiate this class to start processing events.
Instance Method Summary collapse
-
#initialize(db_host:, db_port:, db_name:, db_user:, db_password:, tacview_host:, tacview_port:, tacview_password:, tacview_client_name:, verbose_logging:, thread_count:, populate_airfields:, whitelist: nil, cinc_enabled:, cinc_port:) ⇒ Daemon
constructor
A new instance of Daemon.
- #kill_threads ⇒ Object
- #populate_airfields ⇒ Object
- #start_cinc_thread ⇒ Object
- #start_db_sync_thread ⇒ Object
-
#start_processing ⇒ Object
Starts processing and reconnects if the client was disconnected.
- #start_processing_threads ⇒ Object
- #start_reporting_thread ⇒ Object
Constructor Details
#initialize(db_host:, db_port:, db_name:, db_user:, db_password:, tacview_host:, tacview_port:, tacview_password:, tacview_client_name:, verbose_logging:, thread_count:, populate_airfields:, whitelist: nil, cinc_enabled:, cinc_port:) ⇒ Daemon
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
# File 'lib/tac_scribe/daemon.rb', line 16 def initialize(db_host:, db_port:, db_name:, db_user:, db_password:, tacview_host:, tacview_port:, tacview_password:, tacview_client_name:, verbose_logging:, thread_count:, populate_airfields:, whitelist: nil, cinc_enabled:, cinc_port:) Datastore.instance.configure do |config| config.host = db_host config.port = db_port config.database = db_name config.username = db_user config.password = db_password end Datastore.instance.connect @event_queue = EventQueue.new @verbose_logging = verbose_logging @populate_airfields = populate_airfields @thread_count = thread_count @threads = {} @whitelist = Set.new(IO.read(whitelist).split) if whitelist @tacview_client = TacviewClient::Client.new( host: tacview_host, port: tacview_port, password: tacview_password, processor: @event_queue, client_name: tacview_client_name ) if cinc_enabled @cinc_client = Cinc::Client.new( host: tacview_host, port: cinc_port ) end end |
Instance Method Details
#kill_threads ⇒ Object
94 95 96 97 98 99 100 101 |
# File 'lib/tac_scribe/daemon.rb', line 94 def kill_threads puts 'Killing Threads' @threads.each_pair do |key, thread| puts "Killing #{key} thread" thread.kill thread.join end end |
#populate_airfields ⇒ Object
178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
# File 'lib/tac_scribe/daemon.rb', line 178 def populate_airfields json = File.read(File.join(File.dirname(__FILE__), '../../data/airfields.json')) airfields = JSON.parse(json) airfields.each_with_index do |airfield, i| @event_queue.update_object( object_id: (45_000_000 + i).to_s, latitude: BigDecimal(airfield['lat'].to_s), longitude: BigDecimal(airfield['lon'].to_s), altitude: BigDecimal(airfield['alt'].to_s), type: 'Ground+Static+Aerodrome', name: airfield['name'], coalition: 0 # Neutral ) end end |
#start_cinc_thread ⇒ Object
115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
# File 'lib/tac_scribe/daemon.rb', line 115 def start_cinc_thread return unless @cinc_client cinc_thread = Thread.new do loop do @cinc_client.connect while true @cinc_client.get_airbases.each do |airbase| @event_queue.update_object Hash[airbase.map{ |k, v| [k.to_sym, v] }] end sleep 60 end rescue StandardError => e puts 'Exception in cinc thread' puts e. puts e.backtrace sleep 30 next end end cinc_thread.name = 'Cinc Processor' @threads[:cinc] = cinc_thread end |
#start_db_sync_thread ⇒ Object
139 140 141 142 143 144 145 146 147 148 149 150 151 |
# File 'lib/tac_scribe/daemon.rb', line 139 def start_db_sync_thread db_write_thread = Thread.new do loop do sleep 1 deleted = Datastore.instance.write_objects(Cache.instance.data.values) deleted.each { |id| Cache.instance.data.delete(id) } rescue StandardError next end end db_write_thread.name = 'Database Writing' @threads[:database] = db_write_thread end |
#start_processing ⇒ Object
Starts processing and reconnects if the client was disconnected. Because connecting to Tacview always gives an initial unit dump we truncate the table each time we reconnect. This will make sure there are no ghost units hanging around after server restart for example
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/tac_scribe/daemon.rb', line 59 def start_processing loop do puts 'Starting processing loop' @event_queue.clear Datastore.instance.truncate_table Cache.instance.clear start_processing_threads start_cinc_thread start_db_sync_thread start_reporting_thread populate_airfields if @populate_airfields @threads.each_pair do |key, _value| puts "#{key} thread started" end @tacview_client.connect # If this code is executed it means we have been disconnected without # exceptions. We will still sleep to stop a retry storm. Just not as # long. puts "Disconnected from Tacview" kill_threads sleep 10 # Rescuing reliably from Net::HTTP is a complete bear so rescue # StandardError. It ain't pretty but it is reliable. We will puts # the exception just in case # https://stackoverflow.com/questions/5370697/what-s-the-best-way-to-handle-exceptions-from-nethttp rescue StandardError => e puts 'Exception in processing loop' puts e. puts e.backtrace kill_threads sleep 30 next end end |
#start_processing_threads ⇒ Object
103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/tac_scribe/daemon.rb', line 103 def start_processing_threads @event_processor = EventProcessor.new(cache: Cache.instance, datastore: Datastore.instance, event_queue: @event_queue, whitelist: @whitelist) event_processor_thread = Thread.new do @event_processor.start end event_processor_thread.name = 'Event Processor' @threads[:processing] = event_processor_thread end |
#start_reporting_thread ⇒ Object
153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 |
# File 'lib/tac_scribe/daemon.rb', line 153 def start_reporting_thread return unless @verbose_logging reporting_thread = Thread.new do sleep 5 loop do puts "#{Time.now.strftime('%FT%T')}\t" \ "Events Incoming: #{@event_queue.events_written}\t" \ "Processed: #{@event_processor.events_processed}\t" \ "Ignored: #{@event_processor.events_ignored}\t" \ "Queue Size: #{@event_queue.size}\t" \ "Objects Written: #{Datastore.instance.written}\t" \ "Deleted: #{Datastore.instance.deleted}" @event_queue.events_written = 0 @event_processor.events_processed = 0 @event_processor.events_ignored = 0 Datastore.instance.written = 0 Datastore.instance.deleted = 0 sleep 1 end end reporting_thread.name = 'Reporting' @threads[:reporting] = reporting_thread end |