Skip to article frontmatterSkip to article content

Initial Partitioning (2433 patches)

split the original PASTIS dataset into 2433 per-patch subsets, package each patch into a tar archive, and upload them individually to an S3 bucket.

#pip install boto3==1.35.95 botocore==1.35.95 python-dotenv
from dotenv import load_dotenv
load_dotenv("pastis.env")
True
import os
import json
from pathlib import Path
from typing import Dict, List
def load_geojson_ids(path: Path, max_items: int = None) -> List[str]:
    with open(path, "r", encoding="utf-8") as f:
        geojson = json.load(f)
    
    features = geojson.get("features", [])
    ids = [feature["id"] for feature in features]
    
    if max_items is not None:
        ids = ids[:max_items]
    
    return ids

def find_files_with_ids(base_path: Path, ids: List[str]) -> Dict[str, List[Path]]:
    results = {id_: [] for id_ in ids}
    
    for root, _, files in os.walk(base_path):
        for file in files:
            if 'aux' in file.lower() or file.lower().startswith('zones_'):
                continue
            for id_ in ids:
                if id_ in file:
                    results[id_].append(Path(root) / file)
    
    return results

BASE_DIR = Path(os.getenv("BASE_DIR"))
geojson_ids = load_geojson_ids(Path(os.getenv("BASE_DIR")) / "metadata_pastis.geojson") #, 100) # 2433 in total
matched_files = find_files_with_ids(BASE_DIR, geojson_ids)
import numpy as np
import xarray as xr
from pathlib import Path

count = 0
for id_, paths in matched_files.items():
    if len(paths) == 8:  # Expect exactly 8 files per patch
        count += 1
    else:
        print(f"{id_} MISMATCH - found {len(paths)} files")
        for path in paths:
            print(f"  {path}")
            path = Path(path)

            if path.suffix == ".npy":
                da = xr.DataArray(np.load(path), name=path.stem)
                print(da.shape)
            elif path.suffix == ".tif":
                try:
                    import rioxarray
                    da = rioxarray.open_rasterio(path)
                    print(da.shape)
                except ImportError:
                    pass  # silently skip if rioxarray is not available

print(f"Number of patches with exactly 8 files: {count}")
Number of patches with exactly 8 files: 2433
import os
import boto3
import botocore
import tarfile
import tempfile
from pathlib import Path
from botocore.config import Config
from botocore.exceptions import ClientError

#print("boto3 version:", boto3.__version__)
#print("botocore version:", botocore.__version__)

BUCKET_NAME = os.getenv("BUCKET_NAME")
BUCKET_PREFIX = os.getenv("BUCKET_PREFIX", "")
AWS_REGION = os.getenv("AWS_REGION", "")
AWS_ENDPOINT_URL = os.getenv("AWS_ENDPOINT_URL")
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")

#print(BUCKET_NAME)

missing = [v for v in ["BUCKET_NAME", "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY"] if not os.getenv(v)]
if missing:
    raise EnvironmentError(f"Missing required env vars: {', '.join(missing)}")

boto_config = Config(
    s3={'addressing_style': 'path'},
    retries={'max_attempts': 3},
    signature_version='s3v4'
)

s3 = boto3.client(
    "s3",
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    endpoint_url=AWS_ENDPOINT_URL,
    region_name=AWS_REGION,
    config=boto_config
)

try:
    s3.head_bucket(Bucket=BUCKET_NAME)

    test_key = f"{BUCKET_PREFIX.rstrip('/')}/test.txt" if BUCKET_PREFIX else "test.txt"
    s3.put_object(Bucket=BUCKET_NAME, Key=test_key, Body=b"test")
    s3.delete_object(Bucket=BUCKET_NAME, Key=test_key)

    print(f"✅ Bucket '{BUCKET_NAME}' is accessible and writable.")
except Exception as e:
    raise RuntimeError(f"❌ Bucket check failed: {e}")

skipped = 0
uploaded = 0
for id_, paths in matched_files.items():
    if not paths:
        continue
        
    s3_key = f"{BUCKET_PREFIX.rstrip('/')}/{id_}.tar" if BUCKET_PREFIX else f"{id_}.tar"

    try:
        s3.head_object(Bucket=BUCKET_NAME, Key=s3_key)
        #print(f"⏭️ Skipping {id_}: already exists in S3.")
        skipped += 1
        continue
    except ClientError as e:
        if e.response['Error']['Code'] != "404":
            raise RuntimeError(f"❌ Error checking existence of {s3_key}: {e}")

    with tempfile.NamedTemporaryFile(suffix=".tar", delete=True) as tmp_tar:
        with tarfile.open(tmp_tar.name, "w") as tar:
            for path in paths:
                path = Path(path)
                try:
                    arcname = path.relative_to(BASE_DIR)
                except ValueError:
                    arcname = path.name
                tar.add(path, arcname=arcname)

        s3.upload_file(tmp_tar.name, BUCKET_NAME, s3_key)
        url_display = f"{AWS_ENDPOINT_URL}/{BUCKET_NAME}/{s3_key}"
        uploaded += 1
        print(f"✅ Uploaded: {url_display} ({len(paths)} files)")
        
print(f"Number of patches uploaded: {uploaded} (skipped: {skipped})")
✅ Bucket 'versioneer-papers' is accessible and writable.
Number of patches uploaded: 0 (skipped: 2433)