Cloud & Scalable Computing for Geospatial AI
This cheatsheet covers cloud computing strategies, distributed training, and scalable deployment for geospatial foundation models.
Cloud Computing Fundamentals
Key Cloud Platforms for Geospatial AI
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
import rasterio
from rasterio.windows import Window
import numpy as np
from pathlib import Path
import dask.array as da
from dask.distributed import Client
import xarray as xr
# Cloud platform configurations
= {
cloud_configs 'gcp': {
'compute': ['n1-highmem-32', 'n1-standard-96'],
'gpu': ['nvidia-tesla-v100', 'nvidia-tesla-t4', 'nvidia-tesla-a100'],
'storage': 'gs://bucket-name/',
'earth_engine': True
},'aws': {
'compute': ['m5.24xlarge', 'c5.24xlarge'],
'gpu': ['p3.16xlarge', 'p4d.24xlarge'],
'storage': 's3://bucket-name/',
'sagemaker': True
},'azure': {
'compute': ['Standard_D64s_v3', 'Standard_M128s'],
'gpu': ['Standard_NC24rs_v3', 'Standard_ND40rs_v2'],
'storage': 'https://account.blob.core.windows.net/',
'machine_learning': True
}
}
print("Cloud Platform Comparison:")
for platform, config in cloud_configs.items():
print(f"{platform.upper()}: {config['compute'][0]} | {config['gpu'][0]}")
Distributed Data Loading
class DistributedGeospatialDataset(torch.utils.data.Dataset):
"""Dataset for distributed training with large geospatial tiles"""
def __init__(self, image_paths, rank=0, world_size=1, tile_size=256):
self.image_paths = image_paths
self.rank = rank
self.world_size = world_size
self.tile_size = tile_size
# Distribute files across processes
= len(image_paths) // world_size
files_per_rank = rank * files_per_rank
start_idx = start_idx + files_per_rank if rank < world_size - 1 else len(image_paths)
end_idx self.local_paths = image_paths[start_idx:end_idx]
def __len__(self):
return len(self.local_paths) * 4 # 4 tiles per image
def __getitem__(self, idx):
= idx // 4
file_idx = idx % 4
tile_idx
with rasterio.open(self.local_paths[file_idx]) as src:
= src.height, src.width
height, width
# Calculate tile position
= (tile_idx // 2) * (height // 2)
row = (tile_idx % 2) * (width // 2)
col
= Window(col, row, self.tile_size, self.tile_size)
window = src.read(window=window)
tile
return torch.from_numpy(tile.astype(np.float32))
# Usage example
def setup_distributed_training():
"""Initialize distributed training environment"""
if 'RANK' in os.environ:
= int(os.environ['RANK'])
rank = int(os.environ['WORLD_SIZE'])
world_size = int(os.environ['LOCAL_RANK'])
local_rank
'nccl', rank=rank, world_size=world_size)
torch.distributed.init_process_group(
torch.cuda.set_device(local_rank)
return rank, world_size, local_rank
else:
return 0, 1, 0
= setup_distributed_training()
rank, world_size, local_rank print(f"Process {rank}/{world_size} on device {local_rank}")
Dask for Large-Scale Processing
Chunked Array Operations
# Large raster processing with Dask
def process_large_raster_dask(file_path, chunk_size=1024):
"""Process large rasters using Dask arrays"""
with rasterio.open(file_path) as src:
# Create dask array from raster
= da.from_delayed(
dask_array lambda: src.read())(dtype=src.dtypes[0]),
da.delayed(=(src.count, src.height, src.width),
shape=src.dtypes[0]
dtype
)
# Rechunk for optimal processing
= dask_array.rechunk((1, chunk_size, chunk_size))
dask_array
# Normalize per chunk
= (dask_array - dask_array.mean()) / dask_array.std()
normalized
# Compute NDVI if sufficient bands
if src.count >= 4: # Assuming NIR is band 4, Red is band 3
= dask_array[3]
nir = dask_array[2]
red = (nir - red) / (nir + red)
ndvi return ndvi.compute()
return normalized.compute()
# Distributed client setup
def setup_dask_cluster():
"""Setup Dask distributed cluster"""
# Local cluster
from dask.distributed import LocalCluster
= LocalCluster(n_workers=4, threads_per_worker=2, memory_limit='4GB')
cluster = Client(cluster)
client
# Or cloud cluster (example for GCP)
# from dask_kubernetes import KubeCluster
# cluster = KubeCluster.from_yaml('dask-worker-spec.yaml')
# cluster.scale(10) # Scale to 10 workers
print(f"Dask dashboard: {client.dashboard_link}")
return client
= setup_dask_cluster() client
Parallel Model Inference
def distributed_model_inference(model, data_paths, client):
"""Run model inference across distributed workers"""
def inference_task(path_batch):
"""Single worker inference task"""
import torch
= []
results
for path in path_batch:
with rasterio.open(path) as src:
= src.read()
data = torch.from_numpy(data).unsqueeze(0).float()
tensor
with torch.no_grad():
= model(tensor)
output
results.append(output.numpy())
return results
# Distribute paths across workers
= len(client.scheduler_info()['workers'])
n_workers = len(data_paths) // n_workers
batch_size
= []
futures for i in range(0, len(data_paths), batch_size):
= data_paths[i:i+batch_size]
batch = client.submit(inference_task, batch)
future
futures.append(future)
# Gather results
= client.gather(futures)
results return np.concatenate([r for batch in results for r in batch])
Google Earth Engine Integration
Scalable Earth Engine Processing
import ee
# Initialize Earth Engine
ee.Initialize()
def large_scale_ee_processing():
"""Large-scale processing using Earth Engine"""
# Define region of interest (e.g., entire continent)
= ee.Geometry.Polygon([
region -180, -60], [180, -60], [180, 60], [-180, 60]]
[[
])
# Load Sentinel-2 collection
= (ee.ImageCollection('COPERNICUS/S2_SR')
collection '2023-01-01', '2023-12-31')
.filterDate(
.filterBounds(region)filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', 20)))
.
# Create composite
= collection.median().clip(region)
composite
# Calculate NDVI
= composite.normalizedDifference(['B8', 'B4']).rename('NDVI')
ndvi
# Export to Cloud Storage
= ee.batch.Export.image.toCloudStorage(
task =ndvi,
image='global_ndvi_2023',
description='your-gcs-bucket',
bucket=10,
scale=region,
region=1e13,
maxPixels=256
shardSize
)
task.start()print(f"Task started: {task.status()}")
return task
# Batch processing function
def batch_ee_export(regions, collection_name):
"""Export multiple regions in batch"""
= []
tasks
for i, region in enumerate(regions):
= (ee.ImageCollection(collection_name)
collection
.filterBounds(region)'2023-01-01', '2023-12-31'))
.filterDate(
= collection.median().clip(region)
composite
= ee.batch.Export.image.toCloudStorage(
task =composite,
image=f'region_{i:03d}',
description='your-processing-bucket',
bucket=10,
scale=region,
region=1e9
maxPixels
)
task.start()
tasks.append(task)
return tasks
Model Optimization Strategies
Model Quantization and Pruning
import torch.quantization as quantization
from torch.nn.utils import prune
class OptimizedGeoModel(torch.nn.Module):
"""Optimized model for deployment"""
def __init__(self, base_model):
super().__init__()
self.backbone = base_model.backbone
self.classifier = base_model.classifier
def forward(self, x):
= self.backbone(x)
features return self.classifier(features)
def optimize_model_for_deployment(model, sample_input):
"""Apply optimization techniques for deployment"""
# 1. Pruning (remove 30% of weights)
= []
parameters_to_prune for module in model.modules():
if isinstance(module, torch.nn.Conv2d):
'weight'))
parameters_to_prune.append((module,
prune.global_unstructured(
parameters_to_prune,=prune.L1Unstructured,
pruning_method=0.3
amount
)
# Make pruning permanent
for module, param_name in parameters_to_prune:
prune.remove(module, param_name)
# 2. Quantization
eval()
model.
# Post-training quantization
= quantization.quantize_dynamic(
quantized_model =torch.qint8
model, {torch.nn.Linear, torch.nn.Conv2d}, dtype
)
# 3. TorchScript compilation
= torch.jit.trace(quantized_model, sample_input)
traced_model
return traced_model
# Example usage
= torch.randn(1, 4, 256, 256) # Batch, Channels, Height, Width
sample_input = optimize_model_for_deployment(model, sample_input)
optimized_model
# Save optimized model
'optimized_geo_model.pt')
torch.jit.save(optimized_model, print(f"Model size reduced from {model_size_mb:.1f}MB to {optimized_size_mb:.1f}MB")
ONNX Export for Cross-Platform Deployment
import onnx
import onnxruntime as ort
def export_to_onnx(model, sample_input, output_path):
"""Export PyTorch model to ONNX format"""
eval()
model.
# Export to ONNX
torch.onnx.export(
model,
sample_input,
output_path,=['satellite_image'],
input_names=['predictions'],
output_names={
dynamic_axes'satellite_image': {0: 'batch_size', 2: 'height', 3: 'width'},
'predictions': {0: 'batch_size'}
},=11
opset_version
)
# Verify ONNX model
= onnx.load(output_path)
onnx_model
onnx.checker.check_model(onnx_model)
# Test with ONNX Runtime
= ort.InferenceSession(output_path)
ort_session = {ort_session.get_inputs()[0].name: sample_input.numpy()}
ort_inputs = ort_session.run(None, ort_inputs)
ort_outputs
print(f"ONNX model exported successfully to {output_path}")
return ort_session
# Export model
= export_to_onnx(model, sample_input, 'geo_model.onnx') onnx_session
Container Deployment
Docker Configuration
# Create Dockerfile programmatically
= '''
dockerfile_content FROM nvidia/cuda:11.8-runtime-ubuntu20.04
# Install system dependencies
RUN apt-get update && apt-get install -y \\
python3 python3-pip \\
gdal-bin libgdal-dev \\
&& rm -rf /var/lib/apt/lists/*
# Set GDAL environment variables
ENV GDAL_DATA=/usr/share/gdal
ENV PROJ_LIB=/usr/share/proj
# Install Python dependencies
COPY requirements.txt .
RUN pip3 install --no-cache-dir -r requirements.txt
# Copy model and inference code
COPY geo_model.onnx /app/
COPY inference_api.py /app/
WORKDIR /app
# Expose port
EXPOSE 8000
# Run inference API
CMD ["python3", "inference_api.py"]
'''
# Requirements file
= '''
requirements_content torch>=1.12.0
torchvision>=0.13.0
rasterio>=1.3.0
numpy>=1.21.0
fastapi>=0.68.0
uvicorn>=0.15.0
onnxruntime-gpu>=1.12.0
pillow>=8.3.0
'''
# Save files
with open('Dockerfile', 'w') as f:
f.write(dockerfile_content)
with open('requirements.txt', 'w') as f:
f.write(requirements_content)
print("Docker configuration files created")
Kubernetes Deployment
# Kubernetes deployment YAML
= '''
k8s_deployment apiVersion: apps/v1
kind: Deployment
metadata:
name: geo-ai-inference
spec:
replicas: 3
selector:
matchLabels:
app: geo-ai-inference
template:
metadata:
labels:
app: geo-ai-inference
spec:
containers:
- name: geo-ai-inference
image: your-registry/geo-ai:latest
ports:
- containerPort: 8000
resources:
requests:
memory: "4Gi"
cpu: "2"
nvidia.com/gpu: 1
limits:
memory: "8Gi"
cpu: "4"
nvidia.com/gpu: 1
env:
- name: MODEL_PATH
value: "/app/geo_model.onnx"
---
apiVersion: v1
kind: Service
metadata:
name: geo-ai-service
spec:
selector:
app: geo-ai-inference
ports:
- port: 80
targetPort: 8000
type: LoadBalancer
'''
with open('k8s-deployment.yaml', 'w') as f:
f.write(k8s_deployment)
print("Kubernetes deployment configuration created")
Performance Monitoring
Resource Monitoring
import psutil
import nvidia_ml_py3 as nvml
import time
import logging
class ResourceMonitor:
"""Monitor system resources during inference"""
def __init__(self):
self.logger = logging.getLogger('resource_monitor')
try:
nvml.nvmlInit()self.gpu_available = True
self.device_count = nvml.nvmlDeviceGetCount()
except:
self.gpu_available = False
self.device_count = 0
def get_system_stats(self):
"""Get current system resource usage"""
= {
stats 'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'memory_gb': psutil.virtual_memory().used / (1024**3),
'disk_io': psutil.disk_io_counters(),
'network_io': psutil.net_io_counters()
}
if self.gpu_available:
= []
gpu_stats for i in range(self.device_count):
= nvml.nvmlDeviceGetHandleByIndex(i)
handle
# GPU utilization
= nvml.nvmlDeviceGetUtilizationRates(handle)
util
# Memory info
= nvml.nvmlDeviceGetMemoryInfo(handle)
mem_info
gpu_stats.append({'gpu_id': i,
'gpu_util_percent': util.gpu,
'memory_util_percent': util.memory,
'memory_used_gb': mem_info.used / (1024**3),
'memory_total_gb': mem_info.total / (1024**3)
})
'gpu'] = gpu_stats
stats[
return stats
def log_performance_metrics(self, inference_time, batch_size):
"""Log performance metrics"""
= self.get_system_stats()
stats
= batch_size / inference_time
throughput
self.logger.info(f"Inference Time: {inference_time:.3f}s")
self.logger.info(f"Throughput: {throughput:.1f} images/sec")
self.logger.info(f"CPU Usage: {stats['cpu_percent']:.1f}%")
self.logger.info(f"Memory Usage: {stats['memory_percent']:.1f}%")
if 'gpu' in stats:
for gpu in stats['gpu']:
self.logger.info(f"GPU {gpu['gpu_id']} Util: {gpu['gpu_util_percent']:.1f}%")
# Usage
= ResourceMonitor()
monitor = time.time()
start_time
# Run inference here...
# predictions = model(batch)
= time.time() - start_time
inference_time =32) monitor.log_performance_metrics(inference_time, batch_size
Best Practices Summary
Scalability Checklist
= {
scalability_checklist 'Data Management': [
'β Use chunked/tiled data formats (COG, Zarr)',
'β Implement distributed data loading',
'β Cache frequently accessed data',
'β Use cloud-native data formats'
],
'Model Optimization': [
'β Apply quantization for deployment',
'β Use model pruning to reduce size',
'β Convert to ONNX for cross-platform deployment',
'β Implement batch inference'
],
'Infrastructure': [
'β Use auto-scaling compute resources',
'β Implement load balancing',
'β Monitor resource utilization',
'β Use container orchestration (Kubernetes)'
],
'Cost Optimization': [
'β Use spot/preemptible instances',
'β Implement lifecycle policies for storage',
'β Monitor and alert on costs',
'β Use appropriate instance types for workload'
]
}
for category, items in scalability_checklist.items():
print(f"\n{category}:")
for item in items:
print(f" {item}")
Key Takeaways
- Distributed Training: Use DDP for multi-GPU training across nodes
- Data Parallelism: Distribute large datasets using Dask and cloud storage
- Model Optimization: Apply quantization, pruning, and ONNX export for deployment
- Container Deployment: Use Docker and Kubernetes for scalable inference
- Resource Monitoring: Track CPU, GPU, memory usage for optimization
- Cloud Integration: Leverage Earth Engine, cloud storage, and managed services
- Cost Management: Use spot instances and lifecycle policies for cost control
These techniques enable processing continent-scale geospatial data and deploying models to serve millions of inference requests efficiently.