import os
import warnings
from glob import glob
import numpy as np
import pandas as pd
import planetary_computer as pc
import pystac_client
import rioxarray
import xarray as xr
from tqdm import tqdm
from .axioms import bands
from .c_factor import c_factor_from_item, c_factor_from_metadata
from .metadata import get_processing_baseline
from .utils import _extrapolate_c_factor
[docs]
def nbar_SAFE(
path: str, cog: bool = True, to_int: bool = False, quiet: bool = False
) -> None:
"""Computes the Nadir BRDF Adjusted Reflectance (NBAR) using the SAFE path.
If the processing baseline is greater than 04.00, the DN values are automatically
shifted before computing NBAR. All images are saved in the SAFE path inside the folder
NBAR.
Parameters
----------
path : str
SAFE path.
cog : bool, default = True
Whether to save the images as Cloud Optimized GeoTIFF (COG).
to_int : bool, default = False
Whether to convert the NBAR output to integer.
quiet : bool, default = False
Whether to show progress.
Returns
-------
None
"""
# Whether to save as COG
if cog:
driver = "COG"
else:
driver = None
# NBAR folder to store the images
nbar_output_path = os.path.join(path, "NBAR")
# Create folder inside the SAFE path
if not os.path.exists(nbar_output_path):
os.makedirs(nbar_output_path)
# Metadata file
metadata = glob(os.path.join(path, "GRANULE", "*", "MTD_TL.xml"))[0]
# Processing baseline
PROCESSING_BASELINE = get_processing_baseline(os.path.join(path, "MTD_MSIL2A.xml"))
# Whether to shift the DN values
# After 04.00 all DN values are shifted by 1000
harmonize = PROCESSING_BASELINE >= 4.0
# Compute c-factor
c = c_factor_from_metadata(metadata)
# Extrapolate c-factor
c = _extrapolate_c_factor(c)
# Initialize progress bar
pbar = tqdm(bands.items(), disable=quiet, leave=False)
# Compute NBAR per band
for band, resolution in pbar:
pbar.set_description(f"Processing {band}")
# Image file
img_path = glob(
os.path.join(
path, "GRANULE", "*", "IMG_DATA", "*", f"*_{band}_{resolution}m.jp2"
)
)[0]
# Rename filename to tif extension
filename = os.path.split(img_path)[1].replace("jp2", "tif")
# Open image and convert zeros to nan
img = rioxarray.open_rasterio(img_path)
img = img.where(lambda x: x > 0, other=np.nan)
# Harmonize
if harmonize:
img = img - 1000
# Interpolate c-factor of the band to the resolution of the image
interpolated = c.sel(band=band).interp(
y=img.y, x=img.x, method="linear", kwargs={"fill_value": "extrapolate"}
)
# Compute the NBAR
img = img * interpolated
if to_int:
img = img.round().astype("int16")
# Save the image
img.rio.to_raster(os.path.join(nbar_output_path, filename), driver=driver)
pbar.set_description(f"Done")
# Show the path where the images were saved
if not quiet:
print(f"Saved to {nbar_output_path}")
[docs]
def nbar_stac(
da: xr.DataArray, stac: str, collection: str, epsg: str, quiet: bool = False
) -> xr.DataArray:
"""Computes the Nadir BRDF Adjusted Reflectance (NBAR) for a :code:`xarray.DataArray`.
If the processing baseline is greater than 04.00, the DN values are automatically
shifted before computing NBAR.
Parameters
----------
da : xarray.DataArray
Data array to use for the NBAR calculation.
stac : str
STAC Endpoint of the data array.
collection : str
Collection name of the data array.
epsg : str
EPSG code of the data array (e.g. "epsg:3115").
quiet : bool, default = False
Whether to show progress.
Returns
-------
xarray.DataArray
NBAR data array.
"""
# Keep attributes xarray
xr.set_options(keep_attrs=True)
# Open catalogue
CATALOG = pystac_client.Client.open(stac)
# Do a search
SEARCH = CATALOG.search(ids=da.id.values, collections=[collection])
# Get the items
items = SEARCH.item_collection()
# Sign the items if using PC
if stac == "https://planetarycomputer.microsoft.com/api/stac/v1":
items = pc.sign(items)
# Order items using pandas
df_items = pd.DataFrame(dict(id=[item.id for item in items], item=items)).set_index(
"id"
)
df_da_ids = pd.DataFrame(dict(id=da.id.values)).set_index("id")
ordered_df = df_da_ids.join(df_items)
ordered_items = ordered_df.item.values
# Compute the c-factor per item and extract the processing baseline
c_array = []
processing_baseline = []
exclude = [] # Save indices to exclude (this for not having angles for all bands)
for i, item in tqdm(
enumerate(ordered_items), disable=quiet, desc="Processing items", leave=False
):
try:
c = c_factor_from_item(item, epsg)
c = c.interp(
y=da.y.values,
x=da.x.values,
method="linear",
kwargs={"fill_value": "extrapolate"},
)
c_array.append(c)
processing_baseline.append(item.properties["s2:processing_baseline"])
except ValueError:
# Append indices to exclude, then pass
exclude.append(i)
warnings.warn(
f"""Item {i} with datetime {item.properties['datetime']} omitted as it doesn't have angles for all bands."""
)
pass
# Exclude all timesteps were tile angles didn't exist for all bands
if len(exclude) > 0:
include = np.delete(np.arange(da.time.shape[0]), exclude)
da = da.isel(time=include)
# Processing baseline as data array
processing_baseline = xr.DataArray(
[float(x) for x in processing_baseline],
dims="time",
coords=dict(time=da.time.values),
)
# Whether to shift the DN values
# After 04.00 all DN values are shifted by 1000
harmonize = xr.where(processing_baseline >= 4.0, -1000, 0)
# Zeros are NaN
da = da.where(lambda x: x > 0, other=np.nan)
# Harmonize (use processing baseline)
da = da + harmonize
# Concatenate c-factor
c = xr.concat(c_array, dim="time")
c["time"] = da.time.values
# Compute NBAR
da = da * c
# Delete infinite values
da = da.astype("float32")
da = da.where(lambda x: np.isfinite(x), other=np.nan)
return da
[docs]
def nbar_stackstac(
da: xr.DataArray, stac: str, collection: str, quiet: bool = False
) -> xr.DataArray:
"""Computes the Nadir BRDF Adjusted Reflectance (NBAR) for a :code:`xarray.DataArray`
obtained via :code:`stackstac`.
If the processing baseline is greater than 04.00, the DN values are automatically
shifted before computing NBAR.
Parameters
----------
da : xarray.DataArray
Data array obtained via :code:`stackstac` to use for the NBAR calculation.
stac : str
STAC Endpoint of the data array.
collection : str
Collection name of the data array.
quiet : bool, default = False
Whether to show progress.
Returns
-------
xarray.DataArray
NBAR data array.
"""
# Get info from the stackstac data array
epsg = da.attrs["crs"]
# Compute NBAR
da = nbar_stac(da, stac, collection, epsg, quiet)
return da
[docs]
def nbar_cubo(da: xr.DataArray, quiet: bool = False) -> xr.DataArray:
"""Computes the Nadir BRDF Adjusted Reflectance (NBAR) for a :code:`xarray.DataArray`
obtained via :code:`cubo`.
If the processing baseline is greater than 04.00, the DN values are automatically
shifted before computing NBAR.
Parameters
----------
da : xarray.DataArray
Data array obtained via :code:`cubo` to use for the NBAR calculation.
quiet : bool, default = False
Whether to show progress.
Returns
-------
xarray.DataArray
NBAR data array.
"""
# Get info from the cubo data array
stac = da.attrs["stac"]
collection = da.attrs["collection"]
epsg = da.attrs["epsg"]
epsg = f"epsg:{epsg}"
# Compute NBAR
da = nbar_stac(da, stac, collection, epsg, quiet)
return da