This code sample is powered by Claude Sonet AI 3.7 (as of 3/19/2024).
This script efficiently converts aircraft position logs from a PiAware node into a format optimized for viewers like Google Earth.
The input consists of JSON data collected every second from a PiAware device, including information such as IOC ID, flight number, latitude, longitude, and altitude. Although the data is from ADS-B OUT, additional ADS-B information is not yet collected, as it may be secondary or not universally provided by all flights. Interestingly, Boeing and Airbus are beginning to transmit autopilot mode (when engaged) with their latest software updates.
Let’s review the input and output formats first before diving into the code.
The piaware output, yes it could be json and define itself but that is heavy format for millions of CQs.
This is the fastest to process format that is restartable and appendable and also imports without problems directly into the same kml visualizers that I prefer . It is also very compressible with many of the compression routines.
2025-03-07 15:12:59,A2B562,ASA406,41.41580,-87.71616,35000
2025-03-07 15:13:00,A235B7,SWA3138,41.26538,-86.76691,10125
2025-03-07 15:13:00,AC508B,DAL971,41.18523,-86.90582,34000
2025-03-07 15:13:00,A08BB4,SKW5565,41.31198,-87.55249,21600
2025-03-07 15:13:00,ADC65E,AAL3107,41.22230,-86.97308,15850
2025-03-07 15:13:00,A3962D,DAL716,41.42493,-86.62381,31975
2025-03-07 15:13:00,AA11BE,N748RE,41.47478,-87.86763,33075
2025-03-07 15:13:00,AA6533,RPA3549,41.18095,-87.53756,22300
2025-03-07 15:13:00,A3FBF8,DZR356,41.47939,-86.45603,41000
2025-03-07 15:13:00,A79EA0,JIA5514,41.41246,-87.54200,26000
THe speed of the output is 100x better than my first attempt, still single threaded and no Parnellism. I show parallelism in working with betaflight logging in a future entry.
python aircraft_log_to_kml.py –max-aircraft 500000 –reset-checkpoint –max-points 25
Reading aircraft data from: C:\users\chris\aircraft_logs\aircraft_coordinates.log
Checkpoint file C:\users\chris\aircraft_logs\aircraft_log_checkpoint.pkl has been reset.
No checkpoint file found. Starting from scratch.
Processing log entries: 100%|████████████████████████████████████████████████▉| 9000/9012 [00:00<00:00, 428544.10entries/s]
Processed 9000 new log entries.
Generating KML content: 75%|███████████████████████████████████████▉ | 956/1267 [00:13<00:05, 52.45aircraft/s]
Warning: Not enough points for aircraft UJC56 (A3FB48), skipping
Warning: Not enough points for aircraft A93229 (A93229), skipping
……
Generating KML content: 75%|███████████████████████████████████████▉ | 956/1267 [00:19<00:06, 48.86aircraft/s]
KML file statistics:
Total aircraft processed: 956 (from 1267 total)
Total flights detected: 1576
Average flights per aircraft: 1.65
Total points in dataset: 9000
Points after sampling: 6646
Sampling rate: 1
Maximum points per aircraft: 25
Time gap for flight segmentation: 5 minutes
Saving KML file to: C:\users\chris\aircraft_logs\aircraft_tracks_20250319_095321.kml
KML file created successfully.
Checkpoint saved: Last processed timestamp = 2025-03-19 07:34:02
You are seeing the reception plot for a few days over the past month. As solar activity and local cell tower activity can overwhelm the the 90 cent receiver chip on my SDR Dongle. This particular receiver is shielded by the house from a Cell tower to the east and uses a 2 dollar antenna.
There is a zone with little reception within 10 miles of the SDR, this is due the nature of a dipole antennas . The null zone is at the zenith, and additional shielding is from the metal roof on my house.
As you can see the over my recover most of the lineup for Chicago approach has through traffic EW going along the classic 270 radial (an airmail route established in the era of Limburge) to/from Chicago Heights. Perfect IFR (I follow roads) along US-30/I-80. So much of arrivals to midway end up at point at 3 to 7 miles from 31C depending on commuter or heavy. If you check the IFR map, best fix name ever GIPER.
On to the code…..
One of the items to highlight is the 5 minute no see timer, this ends flights after not being reported after a period. I see the same southwest airlines plane bound for midway up to 6 times a day, and the take off and landing of international flights as often as every 14 hours.
How well the simple KML library works is awesome compared to the other ways I did this.
Overall before AI this would have ran better part of a day, not have nearly the comments, fighting the KML python library all the way. I am guessing that I have 3 hours and 200 queries with the AI and another 10 hours fighting KML formats with google earth.
#!/usr/bin/env python3
"""
Aircraft Log to KML Converter
This script reads aircraft coordinate data from a log file and generates a KML file
for visualization in Google Earth or other KML-compatible applications.
Features:
- Batch processing for efficient handling of large log files
- Command-line arguments for customization
- Progress reporting during processing
- Option to update existing KML files
- Automatic backup of existing output files
- Checkpoint capability to save processing state
- Incremental processing (only process new data since last run)
- Point sampling to reduce KML file size
- Limits on number of points and aircraft to prevent memory issues
- Simplified output options for better performance
- Memory optimization for large datasets
- Flight segmentation based on time gaps (splitting tracks when time between points exceeds 5 minutes)
"""
import os
import sys
import random
import csv
import simplekml
import argparse
import json
import pickle
import shutil
import re
from datetime import datetime
from collections import defaultdict
from pathlib import Path
# Try to import tqdm for progress bars, but provide fallback if not available
try:
from tqdm import tqdm
TQDM_AVAILABLE = True
except ImportError:
TQDM_AVAILABLE = False
# Memory optimization
import gc
# Default file paths
DEFAULT_LOG_FILE = r"C:\users\chris\aircraft_logs\aircraft_coordinates.log"
DEFAULT_OUTPUT_DIR = os.path.dirname(DEFAULT_LOG_FILE)
TIMESTAMP = datetime.now().strftime("%Y%m%d_%H%M%S")
DEFAULT_OUTPUT_FILE = os.path.join(DEFAULT_OUTPUT_DIR, f"aircraft_tracks_{TIMESTAMP}.kml")
DEFAULT_CHECKPOINT_FILE = os.path.join(DEFAULT_OUTPUT_DIR, "aircraft_log_checkpoint.pkl")
DEFAULT_BATCH_SIZE = 1000
DEFAULT_MAX_POINTS_PER_AIRCRAFT = 5000
DEFAULT_MAX_AIRCRAFT = 100
DEFAULT_SAMPLE_RATE = 1 # Use every point by default
def generate_random_color():
"""Generate a random color in KML hex format (aabbggrr)."""
r = random.randint(0, 255)
g = random.randint(0, 255)
b = random.randint(0, 255)
return f"ff{b:02x}{g:02x}{r:02x}" # KML uses aabbggrr format
def parse_arguments():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description='Convert aircraft log data to KML format with flight segmentation.')
parser.add_argument('-i', '--input', default=DEFAULT_LOG_FILE,
help=f'Path to the input log file (default: {DEFAULT_LOG_FILE})')
parser.add_argument('-o', '--output', default=DEFAULT_OUTPUT_FILE,
help=f'Path to the output KML file (default: {DEFAULT_OUTPUT_FILE})')
parser.add_argument('-b', '--batch-size', type=int, default=DEFAULT_BATCH_SIZE,
help=f'Number of log entries to process in each batch (default: {DEFAULT_BATCH_SIZE})')
parser.add_argument('-c', '--checkpoint-file', default=DEFAULT_CHECKPOINT_FILE,
help=f'Path to the checkpoint file (default: {DEFAULT_CHECKPOINT_FILE})')
parser.add_argument('-u', '--update', action='store_true',
help='Update an existing KML file instead of creating a new one')
parser.add_argument('-n', '--no-checkpoint', action='store_true',
help='Disable checkpoint creation and incremental processing')
parser.add_argument('-a', '--all', action='store_true',
help='Process all entries, ignoring the checkpoint')
parser.add_argument('--reset-checkpoint', action='store_true',
help='Reset the checkpoint and start processing from scratch')
parser.add_argument('-v', '--verbose', action='store_true',
help='Enable verbose output')
# New arguments for handling large datasets
parser.add_argument('-s', '--sample-rate', type=int, default=DEFAULT_SAMPLE_RATE,
help=f'Sample rate for point reduction (1=use all points, 2=use every 2nd point, etc.) (default: {DEFAULT_SAMPLE_RATE})')
parser.add_argument('-m', '--max-points', type=int, default=DEFAULT_MAX_POINTS_PER_AIRCRAFT,
help=f'Maximum number of points per aircraft (default: {DEFAULT_MAX_POINTS_PER_AIRCRAFT})')
parser.add_argument('-ma', '--max-aircraft', type=int, default=DEFAULT_MAX_AIRCRAFT,
help=f'Maximum number of aircraft to include (default: {DEFAULT_MAX_AIRCRAFT})')
parser.add_argument('--lines-only', action='store_true',
help='Generate flight paths as lines only, without individual point placemarks')
parser.add_argument('--no-lines', action='store_true',
help='Generate individual point placemarks only, without flight path lines')
parser.add_argument('--sort-by', choices=['points', 'recency', 'callsign'], default='points',
help='Sort aircraft by number of points, recency, or callsign when limiting (default: points)')
# Backup handling options
parser.add_argument('--backup', action='store_true', default=True,
help='Backup existing output file instead of overwriting it (default: True)')
parser.add_argument('--no-backup', action='store_false', dest='backup',
help='Overwrite existing output file without creating a backup')
return parser.parse_args()
def load_checkpoint(checkpoint_file):
"""
Load processing checkpoint from file.
Returns the last processed timestamp and existing aircraft data.
"""
try:
if os.path.exists(checkpoint_file):
with open(checkpoint_file, 'rb') as f:
checkpoint_data = pickle.load(f)
print(f"Loaded checkpoint: Last processed timestamp = {checkpoint_data['last_timestamp']}")
return checkpoint_data['last_timestamp'], checkpoint_data['aircraft_data']
else:
print("No checkpoint file found. Starting from scratch.")
except Exception as e:
print(f"Error loading checkpoint: {e}")
return None, defaultdict(list)
def save_checkpoint(checkpoint_file, last_timestamp, aircraft_data):
"""Save processing checkpoint to file."""
try:
checkpoint_data = {
'last_timestamp': last_timestamp,
'aircraft_data': aircraft_data
}
with open(checkpoint_file, 'wb') as f:
pickle.dump(checkpoint_data, f)
print(f"Checkpoint saved: Last processed timestamp = {last_timestamp}")
except Exception as e:
print(f"Error saving checkpoint: {e}")
def count_log_entries(log_file):
"""Count the total number of lines in the log file for progress reporting."""
try:
with open(log_file, 'r') as f:
return sum(1 for _ in f)
except Exception as e:
print(f"Error counting log entries: {e}")
return 0
def process_log_batch(reader, batch_size, aircraft_data, last_timestamp=None, verbose=False):
"""
Process a batch of log entries.
Returns updated aircraft_data and the timestamp of the last processed entry.
"""
new_data_found = False
current_last_timestamp = last_timestamp
batch_count = 0
for row in reader:
if len(row) >= 6: # Ensure we have all expected fields
timestamp, icao, callsign, lat, lon, alt = row
# Skip entries that have already been processed
if last_timestamp and timestamp <= last_timestamp:
continue
new_data_found = True
try:
# Convert coordinates and altitude to float
lat_float = float(lat)
lon_float = float(lon)
alt_float = float(alt)
# Store the data
aircraft_data[icao].append({
'timestamp': timestamp,
'callsign': callsign,
'latitude': lat_float,
'longitude': lon_float,
'altitude': alt_float
})
# Update the last processed timestamp
if current_last_timestamp is None or timestamp > current_last_timestamp:
current_last_timestamp = timestamp
batch_count += 1
except ValueError:
if verbose:
print(f"Warning: Could not convert coordinates or altitude for row: {row}")
else:
if verbose:
print(f"Warning: Incomplete data in row: {row}")
# Check if we've reached the batch size
if batch_count >= batch_size:
break
return aircraft_data, current_last_timestamp, new_data_found, batch_count
def read_aircraft_data(log_file, batch_size=DEFAULT_BATCH_SIZE, last_timestamp=None, verbose=False):
"""
Read and parse aircraft data from the log file in batches.
Returns a dictionary with ICAO codes as keys and lists of coordinate data as values,
along with the timestamp of the last processed entry.
"""
aircraft_data = defaultdict(list)
current_last_timestamp = last_timestamp
total_processed = 0
new_data_found = False
try:
# Count total entries for progress reporting
total_entries = count_log_entries(log_file)
with open(log_file, 'r') as file:
reader = csv.reader(file)
# Create a progress bar if tqdm is available
if TQDM_AVAILABLE:
progress_bar = tqdm(total=total_entries, desc="Processing log entries", unit="entries")
else:
progress_bar = None
print(f"Processing log file with {total_entries} entries...")
while True:
# Process a batch of entries
aircraft_data, timestamp, new_data, batch_count = process_log_batch(
reader, batch_size, aircraft_data, current_last_timestamp, verbose
)
if new_data:
new_data_found = True
# Update progress and timestamps
total_processed += batch_count
if progress_bar:
progress_bar.update(batch_count)
elif total_processed % (batch_size * 10) == 0 and total_processed > 0:
print(f"Processed {total_processed} entries ({total_processed/total_entries:.1%})...")
if current_last_timestamp is None or (timestamp is not None and timestamp > current_last_timestamp):
current_last_timestamp = timestamp
# If we processed fewer entries than the batch size, we've reached the end of the file
if batch_count < batch_size:
break
if progress_bar:
progress_bar.close()
if not new_data_found and last_timestamp:
print("No new data found since last run. Using existing checkpoint data.")
else:
print(f"Processed {total_processed} new log entries.")
except Exception as e:
print(f"Error reading log file: {e}")
return aircraft_data, current_last_timestamp, new_data_found
def sort_and_limit_aircraft(aircraft_data, max_aircraft, sort_by):
"""
Sort aircraft data and limit to the specified maximum number.
Returns a list of (icao, points) tuples, sorted and limited as specified.
"""
if not aircraft_data:
return []
# Filter out aircraft with no points
aircraft_list = [(icao, points) for icao, points in aircraft_data.items() if points]
# Sort the aircraft according to the specified method
if sort_by == 'points':
# Sort by number of points (descending)
aircraft_list.sort(key=lambda x: len(x[1]), reverse=True)
elif sort_by == 'recency':
# Sort by most recent timestamp (descending)
aircraft_list.sort(key=lambda x: max(p['timestamp'] for p in x[1]), reverse=True)
elif sort_by == 'callsign':
# Sort by callsign (ascending)
aircraft_list.sort(key=lambda x: x[1][-1]['callsign'] if x[1][-1]['callsign'] else "")
# Limit to max_aircraft
return aircraft_list[:max_aircraft]
def sample_points(points, max_points, sample_rate):
"""
Sample points to reduce the total number.
First applies sampling rate, then limits to max_points.
Always includes the first and last point.
"""
if not points:
return []
# Always keep the first and last point
first_point = points[0]
last_point = points[-1]
# Apply sampling rate
if sample_rate > 1 and len(points) > 2:
sampled_points = [points[i] for i in range(1, len(points)-1, sample_rate)]
points = [first_point] + sampled_points + [last_point]
# Apply maximum points limit
if max_points > 0 and len(points) > max_points:
# Calculate stride to evenly distribute points
stride = max(1, (len(points) - 2) // (max_points - 2))
# Select points with the calculated stride, always keeping first and last
points = [first_point] + [points[i] for i in range(1, len(points)-1, stride)][:max_points-2] + [last_point]
return points
def split_into_flight_segments(points, max_gap_minutes=5):
"""
Split a list of points into separate flight segments based on time gaps.
Args:
points: List of point dictionaries with 'timestamp' field
max_gap_minutes: Maximum gap in minutes between points before splitting (default: 5)
Returns:
List of flight segments, where each segment is a list of points
"""
if not points:
return []
# Sort points by timestamp
sorted_points = sorted(points, key=lambda p: p['timestamp'])
flight_segments = []
current_segment = [sorted_points[0]]
for i in range(1, len(sorted_points)):
current_point = sorted_points[i]
previous_point = sorted_points[i-1]
# Calculate time difference in minutes
try:
current_time = datetime.strptime(current_point['timestamp'], "%Y-%m-%d %H:%M:%S")
previous_time = datetime.strptime(previous_point['timestamp'], "%Y-%m-%d %H:%M:%S")
time_diff = (current_time - previous_time).total_seconds() / 60.0
except ValueError:
# If timestamp format is invalid, assume it's a new segment
time_diff = max_gap_minutes + 1
# If time difference is greater than the maximum gap, start a new segment
if time_diff > max_gap_minutes:
flight_segments.append(current_segment)
current_segment = [current_point]
else:
current_segment.append(current_point)
# Add the last segment
if current_segment:
flight_segments.append(current_segment)
return flight_segments
def backup_existing_file(file_path):
"""
Create a backup of an existing file by renaming it with a timestamp or incremental number.
Returns the new backup filename.
"""
if not os.path.exists(file_path):
return None
# Get file components
dir_name = os.path.dirname(file_path)
base_name = os.path.basename(file_path)
name, ext = os.path.splitext(base_name)
# Try timestamp-based backup first
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = os.path.join(dir_name, f"{name}.{timestamp}{ext}")
# If timestamp-based file already exists (highly unlikely but possible),
# try incremental numbering
if os.path.exists(backup_file):
# Find existing backups with numeric suffixes
existing_backups = []
pattern = re.compile(rf"{re.escape(name)}\.(\d+){re.escape(ext)}")
for f in os.listdir(dir_name):
match = pattern.match(f)
if match:
existing_backups.append(int(match.group(1)))
# Find the next available number
next_number = 1
if existing_backups:
next_number = max(existing_backups) + 1
backup_file = os.path.join(dir_name, f"{name}.{next_number}{ext}")
# Create the backup
try:
shutil.copy2(file_path, backup_file)
print(f"Backup created: {backup_file}")
return backup_file
except Exception as e:
print(f"Warning: Failed to create backup: {e}")
return None
def create_kml_file(aircraft_data, output_file, max_points=DEFAULT_MAX_POINTS_PER_AIRCRAFT,
max_aircraft=DEFAULT_MAX_AIRCRAFT, sample_rate=DEFAULT_SAMPLE_RATE,
lines_only=False, no_lines=False, sort_by='points', backup=True):
"""Create a KML file from the aircraft data.
Each aircraft (identified by ICAO code) will have its own folder and a unique color.
Args:
aircraft_data: Dictionary mapping ICAO codes to lists of position data
output_file: Path to save the KML file
max_points: Maximum number of points per aircraft to include
max_aircraft: Maximum number of aircraft to include
sample_rate: Sampling rate for point reduction (1=use all, 2=use every 2nd, etc.)
lines_only: If True, only generate flight path lines without individual points
no_lines: If True, only generate individual points without flight path lines
sort_by: Method to sort aircraft when limiting ('points', 'recency', or 'callsign')
backup: Whether to create a backup of existing output file
"""
# Initialize KML document
kml = simplekml.Kml()
doc = kml.newdocument(name="Aircraft Tracks")
# Statistics counters
total_points = 0
total_aircraft = 0
sampled_points = 0
total_flights = 0
# Get a sorted and limited list of aircraft
aircraft_list = sort_and_limit_aircraft(aircraft_data, max_aircraft, sort_by)
original_aircraft_count = len(aircraft_data)
# Create progress tracking
if TQDM_AVAILABLE:
progress_bar = tqdm(total=len(aircraft_list), desc="Generating KML content", unit="aircraft")
else:
progress_bar = None
print(f"Generating KML content for {len(aircraft_list)} aircraft...")
try:
# Process each aircraft
for icao, points in aircraft_list:
# Memory management - use garbage collection
gc.collect()
# Skip empty data
if not points:
continue
# Get the callsign (use the most recent one)
callsign = points[-1]['callsign'] or f"Unknown-{icao}"
# Count total points before sampling
total_points += len(points)
# Sample points to reduce data size
points = sample_points(points, max_points, sample_rate)
sampled_points += len(points)
if len(points) < 2:
print(f"Warning: Not enough points for aircraft {callsign} ({icao}), skipping")
continue
# Create a folder for this aircraft
folder = doc.newfolder(name=f"{callsign} ({icao})")
# Split points into separate flight segments based on time gaps
flight_segments = split_into_flight_segments(points)
total_flights += len(flight_segments)
# Create flight path lines for each segment if requested
if not no_lines:
for segment_idx, segment in enumerate(flight_segments):
if len(segment) < 2:
continue
# Generate a unique color for each flight segment
color = generate_random_color()
# Format time range for the segment
start_time = segment[0]['timestamp'].split()[1] # Extract time part
end_time = segment[-1]['timestamp'].split()[1] # Extract time part
linestring = folder.newlinestring(
name=f"Flight {segment_idx+1}: {start_time} to {end_time}"
)
linestring.coords = [(p['longitude'], p['latitude'], p['altitude']) for p in segment]
linestring.extrude = 1
linestring.altitudemode = simplekml.AltitudeMode.absolute
linestring.style.linestyle.color = color
linestring.style.linestyle.width = 2
# Create individual points if requested
if not lines_only:
# Create points for each segment, with segment-specific color
for segment_idx, segment in enumerate(flight_segments):
# Use the same color as the corresponding line
segment_color = generate_random_color() if no_lines else None
for i, p in enumerate(segment):
# Create a placemark for this position
placemark = folder.newpoint(
name=f"{callsign} Flight {segment_idx+1}: {p['timestamp']}",
coords=[(p['longitude'], p['latitude'], p['altitude'])]
)
# Set placemark properties
# If we're generating lines, use the same color as the line for this segment
if no_lines:
placemark.style.iconstyle.color = segment_color
else:
# Use the same color index to get the matching color
placemark.style.iconstyle.color = generate_random_color()
placemark.style.iconstyle.scale = 0.5
# Add description with details
placemark.description = (
f"ICAO: {icao}<br/>"
f"Callsign: {p['callsign']}<br/>"
f"Time: {p['timestamp']}<br/>"
f"Latitude: {p['latitude']:.6f}<br/>"
f"Longitude: {p['longitude']:.6f}<br/>"
f"Altitude: {p['altitude']} ft"
)
# Update progress
total_aircraft += 1
if progress_bar:
progress_bar.update(1)
elif total_aircraft % 10 == 0:
print(f"Processed {total_aircraft}/{len(aircraft_list)} aircraft...")
# Close progress bar
if progress_bar:
progress_bar.close()
# Print statistics
print(f"KML file statistics:")
print(f" Total aircraft processed: {total_aircraft} (from {original_aircraft_count} total)")
print(f" Total flights detected: {total_flights}")
print(f" Average flights per aircraft: {total_flights / total_aircraft:.2f}")
print(f" Total points in dataset: {total_points}")
print(f" Points after sampling: {sampled_points}")
print(f" Sampling rate: {sample_rate}")
print(f" Maximum points per aircraft: {max_points}")
print(f" Time gap for flight segmentation: 5 minutes")
# Check if output file exists and create backup if needed
if os.path.exists(output_file) and backup:
backup_file = backup_existing_file(output_file)
if backup_file:
print(f"Backed up existing file to: {os.path.basename(backup_file)}")
# Save the KML file
print(f"Saving KML file to: {output_file}")
kml.save(output_file)
print(f"KML file created successfully.")
except Exception as e:
print(f"Error creating KML file: {e}")
import traceback
traceback.print_exc()
def main():
# Parse command-line arguments
args = parse_arguments()
print(f"Reading aircraft data from: {args.input}")
# Load checkpoint if not disabled
last_timestamp = None
aircraft_data = defaultdict(list)
loaded_from_checkpoint = False
# Reset checkpoint if requested
if args.reset_checkpoint and os.path.exists(args.checkpoint_file):
os.remove(args.checkpoint_file)
print(f"Checkpoint file {args.checkpoint_file} has been reset.")
if not args.no_checkpoint and not args.all:
last_timestamp, checkpoint_data = load_checkpoint(args.checkpoint_file)
if checkpoint_data:
aircraft_data = checkpoint_data
loaded_from_checkpoint = True
# Read the aircraft data
aircraft_data, last_timestamp, new_data_found = read_aircraft_data(
args.input,
batch_size=args.batch_size,
last_timestamp=last_timestamp,
verbose=args.verbose
)
# Check if we have any aircraft data to work with
if not aircraft_data:
print("No valid aircraft data found in log file or checkpoint.")
return
# If we have no new data but loaded from checkpoint, we can still create a KML file
if not new_data_found and last_timestamp and not args.all:
if loaded_from_checkpoint:
print("Using existing aircraft data from checkpoint to create KML file.")
else:
print("No new data found and no existing checkpoint data available.")
return
# Create or update the KML file
if args.update and os.path.exists(args.output):
print(f"Updating existing KML file: {args.output}")
# In a real implementation, we would merge with existing KML data here
# For this version, we'll just overwrite with new data
create_kml_file(
aircraft_data=aircraft_data,
output_file=args.output,
max_points=args.max_points,
max_aircraft=args.max_aircraft,
sample_rate=args.sample_rate,
lines_only=args.lines_only,
no_lines=args.no_lines,
sort_by=args.sort_by,
backup=args.backup
)
if not args.no_checkpoint:
save_checkpoint(args.checkpoint_file, last_timestamp, aircraft_data)
if __name__ == "__main__":
main()
