"""
Visualize the spectral seasonality of an area to facilitate composting methods development
geeViz.phEEnoViz facilitates the creation of plots to show the seasonality of an area. This is a good tool for deciding what date ranges to use for compositing.
"""
"""
Copyright 2025 Ian Housman
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# Script to help visualize variability of observations across space and time
# Intended to work within the geeViz package
######################################################################
import geeViz.getImagesLib as getImagesLib
import os, json, pdb, glob, math, threading, time, datetime
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patheffects as pe
from datetime import date
ee = getImagesLib.ee
Map = getImagesLib.Map
Map.clearMap()
###############################################################
[docs]
def check_dir(dir):
"""
Ensures that a directory exists, creating it if necessary.
Args:
dir (str): The directory path.
Example:
>>> check_dir('/tmp/mydir')
"""
# Create the directory if it does not exist
if not os.path.exists(dir):
os.makedirs(dir)
###############################################################
[docs]
def limitThreads(limit):
"""
Limits the number of active threads to a specified limit.
Args:
limit (int): The maximum number of threads allowed.
Example:
>>> limitThreads(2)
"""
# Wait until the number of active threads is below the limit
while threading.activeCount() > limit:
time.sleep(1)
print(threading.activeCount(), "threads running")
###############################################################
# Function to extract a json table of zonal stats from an image
[docs]
def getTableWrapper(
image,
fc,
outputName,
reducer=ee.Reducer.first(),
scale=30,
crs="EPSG:4326",
transform=None,
tryNumber=1,
maxTries=15,
):
"""
Extracts a JSON table of zonal statistics from an image and exports it.
Args:
image (ee.Image): The image to reduce.
fc (ee.FeatureCollection): The feature collection for zonal stats.
outputName (str): Output filename for the JSON table.
reducer (ee.Reducer, optional): Reducer to use. Defaults to ee.Reducer.first().
scale (int, optional): Scale in meters. Defaults to 30.
crs (str, optional): Coordinate reference system. Defaults to "EPSG:4326".
transform (optional): Transform parameter. Defaults to None.
tryNumber (int, optional): Current try number. Defaults to 1.
maxTries (int, optional): Maximum number of tries. Defaults to 15.
Example:
>>> getTableWrapper(image, fc, 'output.json')
"""
# Run reduceRegions to get zonal statistics for each feature in the collection
table = image.reduceRegions(fc, reducer, scale, crs, transform, 4)
try:
print("Exporting table:", outputName)
t = table.getInfo()
# Write the result to a JSON file
o = open(outputName, "w")
o.write(json.dumps(t))
o.close()
# Convert the JSON to CSV(s) for easier analysis
convert_to_csv(outputName)
except Exception as e:
print("Error encountered:", e)
# Retry if not at maxTries and not a fatal error
tryNumber += 1
if tryNumber < maxTries and e.args[0].find(" Parameter 'image' is required") == -1:
print("Trying to convert table again. Try number:", tryNumber)
getTableWrapper(
image,
fc,
outputName,
reducer,
scale,
crs,
transform,
tryNumber,
maxTries,
)
###############################################################
# Wrapper to get a sample of locations for a given area
[docs]
def getTimeSeriesSample(
startYear,
endYear,
startJulian,
endJulian,
compositePeriod,
exportBands,
studyArea,
nSamples,
output_table_name,
showGEEViz,
maskSnow=False,
programs=["Landsat", "Sentinel2"],
):
"""
Samples locations for a given area and exports time series data.
Args:
startYear (int): Start year.
endYear (int): End year.
startJulian (int): Start Julian day.
endJulian (int): End Julian day.
compositePeriod (int): Composite period in days.
exportBands (list): List of bands to export.
studyArea (ee.Geometry or ee.FeatureCollection): Study area.
nSamples (int): Number of samples.
output_table_name (str): Output table name.
showGEEViz (bool): Whether to show GEEViz visualization.
maskSnow (bool, optional): Whether to mask snow. Defaults to False.
programs (list, optional): List of programs. Defaults to ["Landsat", "Sentinel2"].
Example:
>>> getTimeSeriesSample(2019, 2020, 1, 365, 16, ['NDVI'], studyArea, 100, 'output.json', True)
"""
check_dir(os.path.dirname(output_table_name))
# Load precomputed cloud and shadow statistics for masking
preComputedCloudScoreOffset = getImagesLib.getPrecomputedCloudScoreOffsets(10)
preComputedLandsatCloudScoreOffset = preComputedCloudScoreOffset["landsat"]
preComputedSentinel2CloudScoreOffset = preComputedCloudScoreOffset["sentinel2"]
preComputedTDOMStats = getImagesLib.getPrecomputedTDOMStats()
preComputedLandsatTDOMIRMean = preComputedTDOMStats["landsat"]["mean"]
preComputedLandsatTDOMIRStdDev = preComputedTDOMStats["landsat"]["stdDev"]
preComputedSentinel2TDOMIRMean = preComputedTDOMStats["sentinel2"]["mean"]
preComputedSentinel2TDOMIRStdDev = preComputedTDOMStats["sentinel2"]["stdDev"]
#####################################################################################
# Get the bounding box of the study area
try:
saBounds = studyArea.geometry().bounds()
except:
saBounds = studyArea.bounds()
# Generate random sample points within the study area
randomSample = ee.FeatureCollection.randomPoints(studyArea, nSamples, 0, 50)
Map.addLayer(randomSample, {"layerType": "geeVector"}, "Samples", True)
dummyImage = None
# Loop over each year and export time series for each sample
for yr in range(startYear, endYear + 1):
output_table_nameT = "{}_{}_{}_{}-{}_{}_{}{}".format(
os.path.splitext(output_table_name)[0],
"-".join(programs),
yr,
startJulian,
endJulian,
compositePeriod,
nSamples,
os.path.splitext(output_table_name)[1],
)
if not os.path.exists(output_table_nameT):
# Select imagery source(s) and get processed images
if "Landsat" in programs and "Sentinel2" in programs:
if dummyImage == None:
dummyImage = ee.Image(getImagesLib.getProcessedLandsatAndSentinel2Scenes(saBounds, 2019, 2020, 1, 365).first())
images = getImagesLib.getProcessedLandsatAndSentinel2Scenes(
saBounds,
yr,
yr,
startJulian,
endJulian,
toaOrSR="TOA",
includeSLCOffL7=True,
)
elif "Sentinel2" in programs:
if dummyImage == None:
dummyImage = ee.Image(getImagesLib.getProcessedSentinel2Scenes(saBounds, 2019, 2020, 1, 365).first())
images = getImagesLib.getProcessedSentinel2Scenes(saBounds, yr, yr, startJulian, endJulian)
elif "Landsat" in programs:
if dummyImage == None:
dummyImage = ee.Image(getImagesLib.getProcessedLandsatScenes(saBounds, 2019, 2020, 1, 365).first())
images = getImagesLib.getProcessedLandsatScenes(
saBounds,
yr,
yr,
startJulian,
endJulian,
toaOrSR="TOA",
includeSLCOffL7=True,
)
# Fill empty collections with a dummy image to avoid errors
images = getImagesLib.fillEmptyCollections(images, dummyImage)
# Optionally mask snow pixels
if maskSnow:
print("Masking snow")
images = images.map(getImagesLib.sentinel2SnowMask)
# Add vegetation indices or other band calculations
images = images.map(getImagesLib.HoCalcAlgorithm2)
# Convert to n-day composites for temporal smoothing
composites = getImagesLib.nDayComposites(images, yr, yr, 1, 365, compositePeriod)
# Stack composites into a single multi-band image
stack = composites.select(exportBands).toBands()
# Rename bands to use yyyy_mm_dd format
bns = stack.bandNames()
bns = bns.map(lambda bn: ee.String(bn).split("_").slice(1, None).join("_"))
stack = stack.rename(bns)
# Start export in a new thread to allow parallel processing
tt = threading.Thread(target=getTableWrapper, args=(stack, randomSample, output_table_nameT))
tt.start()
time.sleep(0.1)
# Set thread limit depending on whether visualization is shown
threadLimit = 1
if showGEEViz:
# Visualize the study area and samples in geeViz
Map.addLayer(studyArea, {"strokeColor": "00F"}, "Study Area")
Map.centerObject(studyArea)
Map.view()
threadLimit = 2
limitThreads(threadLimit)
###############################################################
# Function to convert json gee table into csvs
# Assumes id format is bandName_yyyy-dd-mm
[docs]
def convert_to_csv(output_table_name):
"""
Converts a JSON GEE table into CSV files, one per band.
Args:
output_table_name (str): Path to the JSON table.
Example:
>>> convert_to_csv('output.json')
"""
with open(output_table_name) as jf:
table = json.load(jf)
# Parse the JSON to extract all bands and dates present in the table
bands = []
dates = []
print("Finding dates and bands in json:", output_table_name)
for feature in table["features"][:1]:
props = feature["properties"]
for prop in list(props.keys()):
value = props[prop]
band = prop.split("_")[-1]
if band not in bands:
bands.append(band)
date = prop.split("_")[0]
if date not in dates:
dates.append(date)
# For each band, create a CSV with dates as columns and samples as rows
for band in bands:
output_csv = os.path.splitext(output_table_name)[0] + "_{}.csv".format(band)
out_table = "{}\n".format(",".join(dates))
if not os.path.exists(output_csv):
print("Parsing:", band)
# For each feature (sample), extract values for this band across all dates
for feature in table["features"]:
id = feature["id"]
values = []
props = feature["properties"]
prop_keys = list(props.keys())
prop_keys = [i for i in prop_keys if i.split("_")[-1] == band]
for prop in prop_keys:
value = str(props[prop])
if value == "None":
value = ""
values.append(value)
out_line = "{}\n".format(",".join(values))
out_table += out_line
# Write the CSV file
o = open(output_csv, "w")
o.write(out_table)
o.close()
###############################################################
# Function to take a set of csv tables with yyyy-mm-dd dates on the header row and values of a band/index from an
# area for each row (some sort of zonal stat or point location value). Null values are expected to be blank entries in the csv.
# It produces a time series chart of the histogram for each date in the given table
[docs]
def chartTimeSeriesDistributions(
tables,
output_dir,
output_base_name,
n_bins=40,
min_pctl=0.05,
max_pctl=99.95,
background_color="#D6D1CA",
font_color="#1B1716",
overwrite=False,
howManyHarmonics=3,
showChart=False,
annotate_harmonic_peaks=True,
):
"""
Plots time series histograms for each date in the given tables.
Args:
tables (list): List of CSV file paths.
output_dir (str): Output directory for charts.
output_base_name (str): Base name for output charts.
n_bins (int, optional): Number of histogram bins. Defaults to 40.
min_pctl (float, optional): Minimum percentile for value clipping. Defaults to 0.05.
max_pctl (float, optional): Maximum percentile for value clipping. Defaults to 99.95.
background_color (str, optional): Background color for plots. Defaults to "#D6D1CA".
font_color (str, optional): Font color for plots. Defaults to "#1B1716".
overwrite (bool, optional): Overwrite existing charts. Defaults to False.
howManyHarmonics (int, optional): Number of harmonics for regression. Defaults to 3.
showChart (bool, optional): Show chart interactively. Defaults to False.
annotate_harmonic_peaks (bool, optional): Annotate harmonic peaks. Defaults to True.
Example:
>>> chartTimeSeriesDistributions(['table_NDVI.csv'], './charts', 'study_NDVI')
"""
# Ensure output directory exists
check_dir(output_dir)
# Identify all bands present in the input tables
bands = []
for table in tables:
band = os.path.splitext(os.path.basename(table))[0].split("_")[-1]
if band not in bands:
bands.append(band)
for band in bands:
output_chart_name = os.path.join(output_dir, output_base_name + "_" + band + ".png")
if not os.path.exists(output_chart_name) or overwrite:
title = " ".join(output_base_name.split("_")) + " " + band + " Distrubution Time Series"
tablesT = [table for table in tables if os.path.basename(table).find(band) > -1]
# Find the name of the band/index
index_name = band
print("Creating time series distribution chart for:", band)
values = []
# Concatenate all CSVs for this band
data = pd.concat([pd.read_csv(t) for t in tablesT], axis=1)
columns = data.columns
values = data.to_numpy()
# Flatten all values to compute percentiles for clipping
flat = values.flatten()
flat = flat[~(np.isnan(flat))]
min = np.percentile(flat, min_pctl) # Lower bound for histogram
max = np.percentile(flat, max_pctl) # Upper bound for histogram
min_2 = np.percentile(flat, 10) # For annotation
max_2 = np.percentile(flat, 90)
values = values.clip(min, max)
# Extract dates from column names (assumed format: yyyy-mm-dd_band)
dates = [i.split("_")[0] for i in columns]
years = np.unique([i.split("-")[0] for i in dates])
print("years", years)
# Set up bins for histogram
bin_step = (max - min) / n_bins
bins = np.arange(min, max + bin_step, bin_step)
# Compute histograms for each date (column)
hist = np.array([np.histogram(data[column], bins=bins, density=True)[0] for column in columns]).transpose()
hist = np.nan_to_num(hist, nan=0)
hist = hist.clip(np.percentile(hist, 10), np.percentile(hist, 99))
# Harmonic regression fitting to model seasonality
table_xs = np.array([])
table_ys = np.array([])
table_all_xs = np.array([])
d0 = date(1969, 12, 31)
percentiles = []
# For each date, compute decimal year and percentiles
for i, column in enumerate(columns):
d = dates[i]
d1 = date(int(d.split("-")[0]), int(d.split("-")[1]), int(d.split("-")[2]))
delta = d1 - d0
delta_fraction = math.modf(delta.days / 365.25)[0]
decimal_date = int(d.split("-")[0]) + delta_fraction
ys = values[:, i]
ys = ys[~(np.isnan(ys))]
if len(ys) > 3:
percentiles.append(np.percentile(ys, [0, 5, 25, 50, 75, 95, 100]))
else:
percentiles.append([np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan])
xs = np.repeat(decimal_date, len(ys))
table_ys = np.append(table_ys, ys)
table_xs = np.append(table_xs, xs)
table_all_xs = np.append(table_all_xs, decimal_date)
percentiles = np.array(percentiles)
table_all_xs = np.array(table_all_xs).flatten()
table_xs = np.array(table_xs).flatten()
table_ys = np.array(table_ys).flatten()
# Fit harmonic regression models (1, 2, or 3 harmonics)
peak_year_i = 2
if peak_year_i > len(years) - 1:
peak_year_i = 0
xs = np.array([table_xs]).T
sin1Term = np.sin(xs * 2 * math.pi)
cos1Term = np.cos(xs * 2 * math.pi)
sin2Term = np.sin(xs * 4 * math.pi)
cos2Term = np.cos(xs * 4 * math.pi)
sin3Term = np.sin(xs * 6 * math.pi)
cos3Term = np.cos(xs * 6 * math.pi)
intTerm = np.ones(xs.shape[0])
harm_1 = np.c_[sin1Term, cos1Term, xs, intTerm]
harm_1_2 = np.c_[sin1Term, cos1Term, sin2Term, cos2Term, xs, intTerm]
harm_1_2_3 = np.c_[sin1Term, cos1Term, sin2Term, cos2Term, sin3Term, cos3Term, xs, intTerm]
harm_1_model = np.linalg.lstsq(harm_1, table_ys, rcond=None)
harm_1_2_model = np.linalg.lstsq(harm_1_2, table_ys, rcond=None)
harm_1_2_3_model = np.linalg.lstsq(harm_1_2_3, table_ys, rcond=None)
# Find the phase (peak) of the first harmonic
peak_1_fraction = math.atan(harm_1_model[0][0] / harm_1_model[0][1]) / (2 * math.pi)
peak_2_fraction = peak_1_fraction + 0.5
if peak_1_fraction < 0:
peak_1_fraction = 1 + peak_1_fraction
if peak_2_fraction > 1:
peak_2_fraction = peak_2_fraction - 1
peak_1_yr = int(years[peak_year_i]) + peak_1_fraction
peak_2_yr = int(years[peak_year_i]) + peak_2_fraction
peak_1_pred = np.dot(
[
np.sin(peak_1_yr * 2 * math.pi),
np.cos(peak_1_yr * 2 * math.pi),
peak_1_yr,
1,
],
harm_1_model[0],
)
peak_2_pred = np.dot(
[
np.sin(peak_2_yr * 2 * math.pi),
np.cos(peak_2_yr * 2 * math.pi),
peak_2_yr,
1,
],
harm_1_model[0],
)
peak_1_y = min_2
peak_2_y = max_2
if peak_1_pred > peak_2_pred:
peak_1_y = max_2
peak_2_y = min_2
# Convert peak fractions to day-of-year and then to month-day
peak_date = int(peak_1_fraction * 365) + 1
peak_date2 = int(peak_2_fraction * 365) + 1
print(
peak_date,
peak_date2,
peak_1_pred,
peak_2_pred,
years[peak_year_i][2:] + f"{peak_date:03}",
years[peak_year_i][2:] + f"{peak_date2:03}",
)
peak_date = datetime.datetime.strptime(years[peak_year_i][2:] + f"{peak_date:03}", "%y%j").strftime("%m-%d")
peak_date2 = datetime.datetime.strptime(years[peak_year_i][2:] + f"{peak_date2:03}", "%y%j").strftime("%m-%d")
# Apply harmonic model to all dates for plotting
xs = np.array([table_all_xs]).T
sin1Term = np.sin(xs * 2 * math.pi)
cos1Term = np.cos(xs * 2 * math.pi)
sin2Term = np.sin(xs * 4 * math.pi)
cos2Term = np.cos(xs * 4 * math.pi)
sin3Term = np.sin(xs * 6 * math.pi)
cos3Term = np.cos(xs * 6 * math.pi)
intTerm = np.ones(xs.shape[0])
harm_1 = np.c_[sin1Term, cos1Term, xs, intTerm]
harm_1_2 = np.c_[sin1Term, cos1Term, sin2Term, cos2Term, xs, intTerm]
harm_1_2_3 = np.c_[sin1Term, cos1Term, sin2Term, cos2Term, sin3Term, cos3Term, xs, intTerm]
pred_1 = np.dot(harm_1, harm_1_model[0])
pred_1_2 = np.dot(harm_1_2, harm_1_2_model[0])
pred_1_2_3 = np.dot(harm_1_2_3, harm_1_2_3_model[0])
pred_dict = {"1": pred_1, "2": pred_1_2, "3": pred_1_2_3}
pred = pred_dict[str(howManyHarmonics)]
# Plotting
xTickFreq = 8 # Frequency of x-axis ticks
yTickFreq = (max - min) / 10 # Frequency of y-axis ticks
width = (len(columns) / 25) + 2
if width < 8:
width = 12
xTickFreq = 3
fig, ax = plt.subplots(figsize=(width, 7), frameon=True, facecolor="w")
fig.patch.set_facecolor(background_color)
params = {
"ytick.color": font_color,
"xtick.color": font_color,
"axes.labelcolor": font_color,
"axes.edgecolor": font_color,
"legend.fontsize": 7,
"legend.handlelength": 1.2,
}
plt.rcParams.update(params)
ax.set_title(title)
cmap = plt.get_cmap("viridis")
hist = hist[:, :-1]
# Plot the histogram as a color mesh
cf = plt.pcolormesh(dates, bins, hist, cmap=cmap)
degrees = 45
plt.xticks(rotation=degrees, fontsize=7, ha="right")
# Overlay the harmonic regression fit
harm_line = plt.plot(
dates,
pred,
linestyle="-",
color=background_color,
linewidth=2,
label="Harmonic Fit ({})".format(howManyHarmonics),
)
ax.set_ylim([min, max])
ax.set_ylabel("{} Value".format(index_name), fontsize=10)
ax.set_xlabel("Date", fontsize=10)
# Set up the x and y axis tick frequencies
ax.xaxis.set_major_locator(plt.MultipleLocator(xTickFreq))
ax.xaxis.set_minor_locator(plt.MultipleLocator(1))
ax.yaxis.set_major_locator(plt.MultipleLocator(yTickFreq))
ax.yaxis.set_minor_locator(plt.MultipleLocator(1))
ax.grid(True, which="major", axis="y", linestyle="--", color=font_color)
ax.grid(True, which="major", axis="x", linestyle="--", color=font_color)
cbax = fig.add_axes([0.93, 0.11, 0.01, 0.71])
legend = plt.legend(handles=harm_line, bbox_to_anchor=(-2.5, 1.08), loc="upper left")
cb = plt.colorbar(cf, cax=cbax, orientation="vertical")
cb.ax.tick_params(labelsize=10)
cb.set_label(
"Percent of Samples (%)",
rotation=270,
labelpad=15,
fontsize=10,
color=font_color,
)
# Clip predicted peaks to plotting range
if peak_1_pred > max:
peak_1_pred = max
elif peak_1_pred < min:
peak_1_pred = min
if peak_2_pred > max:
peak_2_pred = max
elif peak_2_pred < min:
peak_2_pred = min
if annotate_harmonic_peaks:
print("Annotating peak dates of harmonics")
try:
# Annotate the first and second harmonic peaks on the plot
yr_dates = [i for i in dates if i.split("-")[0] == years[peak_year_i]]
m_dates = [i for i in yr_dates if i.split("-")[1] == peak_date.split("-")[0]]
if len(m_dates) == 0:
m_dates = [i for i in yr_dates if int(i.split("-")[1]) == int(peak_date.split("-")[0]) - 1]
print(m_dates)
m_dates = m_dates[0]
ax.annotate(
"{} ({})".format(peak_date, round(peak_1_pred, 3)),
xy=(m_dates, peak_1_pred),
xycoords="data",
color=background_color,
fontsize="10",
path_effects=[pe.withStroke(linewidth=2.5, foreground=font_color)],
)
yr_dates = [i for i in dates if i.split("-")[0] == years[peak_year_i]]
m_dates = [i for i in yr_dates if i.split("-")[1] == peak_date2.split("-")[0]][0]
ax.annotate(
"{} ({})".format(peak_date2, round(peak_2_pred, 3)),
xy=(m_dates, peak_2_pred),
xycoords="data",
color=background_color,
fontsize="10",
path_effects=[pe.withStroke(linewidth=2.5, foreground=font_color)],
)
except Exception as e:
print(e)
fig.savefig(output_chart_name)
if showChart:
plt.show()
plt.close()
else:
print("Already produced:", output_chart_name)