Source code for geeViz.eeAuth.eeCreds

"""``eeCreds`` — friendly API for using multiple Earth Engine credentials
interchangeably from the same Python process.

**You don't need to call this directly if you only use one credential.**
Importing :mod:`geeViz.geeView` and calling ``Map.view()`` already
auto-starts the proxy under the hood via ``ensure_started("auto")`` —
single-credential workflows get the proxy for free. Multi-credential
workflows use this module to register, switch, and stop credentials
explicitly.

Supports both service accounts and OAuth user-account refresh tokens, in
any of these input formats:

- File path to a JSON key
- Base64-encoded JSON string
- JSON string
- Dict (already-parsed JSON)
- Raw bytes (UTF-8 JSON or base64 of either)

Auto-start behaviour (the default path)
---------------------------------------
::

    from geeViz.geeView import Map
    import ee

    # That's it. Map.view() called below will:
    #   1. Discover credentials in the environment
    #   2. Start a local proxy (uvicorn in a daemon thread) if needed
    #   3. Point ee.Initialize at the proxy
    Map.addLayer(ee.Image("USGS/SRTMGL1_003"), {"min": 0, "max": 4000}, "SRTM")
    Map.view()

The proxy survives for the lifetime of the Python process — subsequent
``Map.view()`` calls reuse it without restarting.

Explicit multi-credential workflow
----------------------------------
::

    from geeViz.eeAuth import eeCreds
    import ee

    eeCreds.addCreds("path/to/sa-prod.json", name="prod")
    eeCreds.addCreds(b64_sa_string,          name="acme")
    eeCreds.addCreds("~/.config/earthengine/credentials", name="ian")  # OAuth
    eeCreds.start()                # initializes ee + spawns local proxy

    eeCreds.use("acme")
    ee.Image(1).getInfo()          # routes through the acme SA

    # Or scoped switching — restores previous tenant on block exit
    with eeCreds.use("ian"):
        ee.Image(2).getInfo()      # routes through ian's OAuth refresh token

Why this exists
---------------
``ee.Initialize()`` stores credentials in module-global state — you can
only have one active identity per Python process. ``eeCreds`` works
around that by running a local proxy server that holds N credentials,
re-signs each EE REST request with the right one, and lets the SDK's
single ``ee.Initialize`` point at the proxy. The proxy reads which
credential to use from a thread-aware ``ContextVar`` set by
``eeCreds.use()``.

Same machinery powers the JS / browser path: each browser tab that
``Map.view()`` opens has its tenant baked into the per-session run_js
file (NOT the page URL), so every tab routes through the right
credential for its lifetime — process-wide ``eeCreds.use()`` switches
can't drift an open tab to a different credential.
"""
from __future__ import annotations

import base64
import binascii
import contextlib
import json
import logging
import os
import sys
import threading
import time
from dataclasses import dataclass, field
from typing import Any, Optional, Union

from .client import (
    CURRENT_TENANT,
    initialize_via_proxy,
    set_tenant,
    reset_tenant,
    tenant_context,
)

logger = logging.getLogger(__name__)

# Re-mint access tokens every 50 min (well inside the 1h expiry).
_TOKEN_TTL_SEC = 50 * 60

# Default port for the in-process proxy when ``start()`` runs one.
# Picked away from common dev ports (8888 is the agent UI) to avoid
# collisions when both are in the same process.
_DEFAULT_PROXY_PORT = 8889


def _gcloud_default_project() -> str:
    """Return ``gcloud config get-value project`` or ``""`` on any
    failure. Used as the last-resort fallback for OAuth project detection.

    Best-effort and silent: if ``gcloud`` isn't installed, the command
    times out, or the project is unset, returns an empty string. Never
    raises — auto-discovery should always succeed at returning
    SOMETHING (possibly empty), never crash.
    """
    import shutil
    import subprocess

    # ``shutil.which`` finds ``gcloud.cmd`` on Windows via PATHEXT.
    gcloud_path = shutil.which("gcloud") or shutil.which("gcloud.cmd")
    if not gcloud_path:
        return ""
    try:
        result = subprocess.run(
            [gcloud_path, "config", "get-value", "project"],
            capture_output=True, text=True, timeout=5,
        )
        if result.returncode != 0:
            return ""
        out = (result.stdout or "").strip()
        if out and out.lower() != "(unset)":
            logger.info(
                "eeCreds: project hint from gcloud config: %r", out
            )
            return out
    except Exception:
        return ""
    return ""


# ──────────────────────── Credential entries ────────────────────────
@dataclass
class _CredEntry:
    """One registered credential, identified by ``name``.

    Holds the parsed credential payload, the type ("sa" / "oauth"), and
    lazily-built google.auth ``Credentials`` + cached access token data.
    """
    name: str
    type: str                       # "sa" or "oauth"
    data: dict                      # the parsed JSON
    source: str = ""                # debugging hint: "path", "b64", "dict", ...
    project_id: str = ""            # for SA: from data["project_id"]; for OAuth: env override
    client_email: str = ""          # SA only

    _creds: Any = field(default=None, repr=False)   # google.oauth2.* Credentials
    _token: dict = field(default_factory=dict, repr=False)
    _token_fetched_at: float = field(default=0.0, repr=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, repr=False)


# ──────────────────────── Core class ────────────────────────
[docs] class EECreds: """Multi-tenant credential registry + EE init lifecycle. Usually used via the module-level singleton ``eeCreds`` — but instantiable directly when you need multiple independent registries (e.g. tests, multi-tenant servers with isolated credential sets):: from geeViz.eeAuth.eeCreds import EECreds creds = EECreds() creds.addCreds(...) creds.start() """ def __init__(self) -> None: self._entries: dict[str, _CredEntry] = {} self._started = False self._proxy_url: Optional[str] = None self._proxy_thread: Optional[threading.Thread] = None self._proxy_server = None # uvicorn.Server self._lock = threading.Lock() # ─────────────── adding credentials ───────────────
[docs] def addCreds( self, creds: Union[str, dict, bytes], name: str, project: Optional[str] = None, ) -> "EECreds": """Register a set of credentials under ``name``. Accepts: - File path to a JSON key file (SA, OAuth, WIF, or impersonation config) - Base64-encoded JSON string (typical for env vars) - JSON string (literal contents) - Already-parsed dict - **Bare service-account email string** (``[email protected]``) → routed to :meth:`addImpersonation` so the proxy mints tokens via the IAM credentials API at request time (no key held) Auto-detects which credential shape it is. ``project`` overrides the default project — required for OAuth + impersonation, optional for SAs since their JSON has ``project_id``. Returns self so chaining works:: eeCreds.addCreds(sa1, "a").addCreds(sa2, "b").start() """ # Bare SA-email string → impersonation. Recognized by the # ``*@*.iam.gserviceaccount.com`` shape; anything else falls # through to the JSON parser. if isinstance(creds, str) and self._looks_like_sa_email(creds.strip()): return self.addImpersonation(creds.strip(), name=name, project=project) data, source = self._parse_input(creds) entry_type, defaults = self._classify(data) entry = _CredEntry( name=name, type=entry_type, data=data, source=source, project_id=project or defaults.get("project_id", "") or "", client_email=defaults.get("client_email", "") or "", ) with self._lock: self._entries[name] = entry logger.info( "eeCreds: added %r (type=%s, source=%s, project=%s)", name, entry_type, source, entry.project_id, ) return self
@staticmethod def _looks_like_sa_email(s: str) -> bool: """True iff ``s`` is shaped like a GCP service-account email. Used by :meth:`addCreds` to route bare emails to impersonation instead of trying to parse them as JSON / base64 / paths.""" if not s or len(s) > 254 or " " in s: return False if "@" not in s: return False local, _, domain = s.partition("@") # SA emails always live under iam.gserviceaccount.com. return bool(local) and domain.endswith(".iam.gserviceaccount.com")
[docs] def addImpersonation( self, target_email: str, name: str, project: Optional[str] = None, ) -> "EECreds": """Register a tenant that mints tokens by **impersonating** ``target_email`` at request time. No key material is held. The runtime's ADC source must hold ``roles/iam.serviceAccountTokenCreator`` on the target SA. On each token mint, ``google.auth.impersonated_credentials`` calls the IAM credentials API to fetch a short-lived (1h) access token; google-auth refreshes it automatically when it expires. This is the multi-tenant **keyless** path: the proxy holds N SA emails, not N JSON keys. Works identically across GCP Cloud Run (source = attached SA), AWS via WIF (source = federated identity from STS), and local dev (source = gcloud user creds). Args: target_email: The SA to impersonate (``[email protected]``). name: Tenant name for ``eeCreds.use(name)``. project: Quota project for EE calls. If omitted, falls back to the runtime's default ADC project. """ if not self._looks_like_sa_email(target_email): raise ValueError( f"eeCreds.addImpersonation: {target_email!r} is not a " "valid SA email (expected ``*@*.iam.gserviceaccount.com``)." ) entry = _CredEntry( name=name, type="impersonated", # Self-describing dict so ``info()`` round-trips and # ``_classify`` can re-route this entry without losing # information. data={ "type": "impersonated_service_account", "client_email": target_email, "impersonate": True, }, source="impersonation", project_id=project or "", client_email=target_email, ) with self._lock: self._entries[name] = entry logger.info( "eeCreds: added %r (type=impersonated, target=%s, project=%s)", name, target_email, entry.project_id, ) return self
[docs] def addADC( self, name: str = "adc", project: Optional[str] = None, ) -> "EECreds": """Register the runtime's Application Default Credentials as a tenant entry. Use this when you want the **proxy** to route through ADC (attached SA on Cloud Run, WIF federation on AWS, gcloud user creds locally) — without holding any key material. Without this, ADC-only deployments still get EE working via :meth:`robust_init` but the proxy starts empty and the multi-tenant features are inaccessible. Tokens are minted by calling ``google.auth.default()`` at mint time, then refreshing. Same code path as keyed SAs from the proxy's perspective. """ entry = _CredEntry( name=name, type="adc", data={"type": "application_default_credentials"}, source="adc", project_id=project or "", client_email="", ) with self._lock: self._entries[name] = entry logger.info( "eeCreds: added %r (type=adc, project=%s)", name, entry.project_id or "<runtime default>", ) return self
def _parse_input(self, x: Union[str, dict, bytes]) -> tuple[dict, str]: """Return ``(parsed_dict, source_description)`` from any of the supported input forms.""" # Bytes → decode as utf-8 first if isinstance(x, bytes): try: x = x.decode("utf-8") except UnicodeDecodeError: x = base64.b64decode(x).decode("utf-8", errors="replace") if isinstance(x, dict): return dict(x), "dict" if not isinstance(x, str): raise TypeError( f"eeCreds.addCreds: expected str / dict / bytes, got {type(x).__name__}" ) # Expand ~ in paths expanded = os.path.expanduser(x) if os.path.isfile(expanded): try: with open(expanded, "r", encoding="utf-8") as f: return json.load(f), f"path:{expanded}" except (OSError, json.JSONDecodeError) as e: raise ValueError( f"eeCreds.addCreds: could not read JSON from {expanded!r}: {e}" ) from e # JSON string? x_stripped = x.strip() if x_stripped.startswith(("{", "[")): try: return json.loads(x_stripped), "json_string" except json.JSONDecodeError as e: raise ValueError( f"eeCreds.addCreds: starts like JSON but failed to parse: {e}" ) from e # If the string LOOKS like a filesystem path (separator chars, # ``.json`` suffix, or a Windows drive letter) but didn't match # ``os.path.isfile`` above, the user almost certainly meant to # pass a file path that doesn't exist — surface that directly # rather than falling through to base64 and producing a # confusing "Only base64 data is allowed" error. looks_like_path = ( "/" in x_stripped or "\\" in x_stripped or x_stripped.lower().endswith(".json") or (len(x_stripped) >= 3 and x_stripped[1:3] == ":\\") ) if looks_like_path: raise FileNotFoundError( f"eeCreds.addCreds: file not found: {expanded!r}. " "Pass an existing path to a JSON key, a JSON literal, " "a base64-encoded JSON string, or a dict." ) # Base64? try: decoded = base64.b64decode(x_stripped, validate=True) data = json.loads(decoded) return data, "base64" except (binascii.Error, ValueError, json.JSONDecodeError) as e: raise ValueError( f"eeCreds.addCreds: could not interpret string as path, JSON, " f"or base64 JSON (last error: {e})" ) from e def _classify(self, data: dict) -> tuple[str, dict]: """Identify the credential shape in ``data``. Returns ``(entry_type, defaults_dict)``. Recognized types: - ``sa``: a service-account key file (``{"type": "service_account", "private_key": ...}``). Holds a long-lived private key; the riskiest shape — security teams typically flag these. - ``oauth``: a refresh-token file (``earthengine authenticate`` or ``gcloud auth application-default login`` output for user accounts). - ``adc``: an Application Default Credentials source — either a workload-identity-federation config (``type: external_account``) OR a marker that the entry should mint via ``google.auth.default()``. No key held; tokens come from the runtime's identity (Cloud Run metadata server, AWS STS via WIF, gcloud user creds, etc.). Both JS and Python proxy paths end up with whatever ``google.auth.default()`` resolves to on this host. - ``impersonated``: a stored target SA email. Tokens minted via ``google.auth.impersonated_credentials`` at request time using ADC as the source. No key held; the runtime identity must have ``roles/iam.serviceAccountTokenCreator`` on the target. This is the multi-tenant keyless path — the proxy holds N emails, not N keys. """ t = data.get("type", "") if t == "service_account": return "sa", { "project_id": data.get("project_id", ""), "client_email": data.get("client_email", ""), } if t == "authorized_user" or "refresh_token" in data: # google-auth-style "authorized_user" file OR a manually-built # refresh-token dict with at least the refresh_token field. return "oauth", { "project_id": data.get("quota_project_id", "") or data.get("project", "") or "", } if t == "external_account": # Workload Identity Federation config. The config itself is # not a secret — it routes ``google.auth`` through GCP STS # to exchange an external (AWS, Azure, OIDC) identity for a # GCP token, optionally chained through impersonation of a # target SA. The ``service_account_impersonation_url`` field # names that target if set; surface it for log clarity. url = data.get("service_account_impersonation_url", "") or "" target = "" if "/serviceAccounts/" in url: target = url.split("/serviceAccounts/", 1)[1].split(":", 1)[0] return "adc", { "project_id": data.get("quota_project_id", "") or "", "client_email": target, } if t == "impersonated_service_account": # Explicit impersonation config (e.g. produced by gcloud # auth impersonate-service-account-flow). Extract the # target SA email so the runtime knows what to impersonate. url = data.get("service_account_impersonation_url", "") or "" target = (data.get("target_principal", "") or (url.split("/serviceAccounts/", 1)[1].split(":", 1)[0] if "/serviceAccounts/" in url else "")) return "impersonated", { "project_id": data.get("quota_project_id", "") or "", "client_email": target, } # ``client_email`` as the only meaningful key + an explicit # ``impersonate`` flag is the in-memory shape ``addImpersonation`` # constructs. Keep it routable through the same classifier so # round-tripping (``addCreds(eeCreds.info(name))``) works. if data.get("impersonate") and data.get("client_email"): return "impersonated", { "project_id": data.get("project_id", "") or "", "client_email": data["client_email"], } raise ValueError( f"eeCreds: unrecognized credential JSON shape " f"(keys: {sorted(data.keys())}). Expected one of:\n" f" - service-account key (type='service_account')\n" f" - OAuth credentials (type='authorized_user' or with 'refresh_token')\n" f" - WIF config (type='external_account')\n" f" - impersonation config (type='impersonated_service_account')" ) # ─────────────── auto-discovery ─────────────── @staticmethod def _detect_oauth_project() -> str: """Best-effort: which GCP project should OAuth-credential EE calls be billed against? OAuth refresh tokens (``~/.config/earthengine/credentials``) don't carry a project — so without an explicit answer the proxy forwards EE requests without ``x-goog-user-project``, EE defaults to ``earthengine-legacy`` as the consumer, and the personal Google account can't use that → 403. Checked sources (first hit wins): 1. ``ee.data._get_state().cloud_api_user_project`` — most reliable; populated by a successful ``ee.Initialize(project=...)``. 2. ``ee.oauth.get_appdefault_project()`` — reads ``application_default_credentials.json``'s ``quota_project_id``; this is the value EE's own ``get_persistent_credentials`` uses, so honouring it keeps our proxy in lockstep with the SDK. 3. ``$GOOGLE_CLOUD_PROJECT`` env var — standard GCP convention. 4. ``$GEE_PROJECT`` env var — geeViz convention. 5. ``gcloud config get-value project`` — last resort, subprocess call. Often diverges from ADC's quota project; lower priority than ADC for that reason. Returns ``""`` when nothing's discoverable; caller can leave the entry's project blank or surface a clear error to the user. """ try: import ee.oauth as _ee_oauth except Exception: _ee_oauth = None # ``earthengine-legacy`` is the SDK's internal placeholder for # "no real project set yet"; SDK project numbers (764086051850 # etc) are EE's shared OAuth-client projects that no end user # can quota against. Neither is a usable project hint — filter # both out at every detection step. def _ok(proj: str) -> bool: if not proj: return False p = proj.strip() if p.lower() == "earthengine-legacy": return False if _ee_oauth is not None and _ee_oauth.is_sdk_project(p): return False return True # 1. EE's current state — only honour if ``ee.Initialize(project=...)`` # set a real project (not the legacy placeholder). try: import ee.data proj = getattr(ee.data._get_state(), "cloud_api_user_project", None) or "" if _ok(proj): return proj except Exception: pass # 2. ADC's quota_project_id — same source EE uses internally. if _ee_oauth is not None: try: proj = _ee_oauth.get_appdefault_project() or "" if _ok(proj): return proj except Exception: pass # 3. GCP standard env var proj = os.environ.get("GOOGLE_CLOUD_PROJECT", "").strip() if _ok(proj): return proj # 4. geeViz convention proj = os.environ.get("GEE_PROJECT", "").strip() if _ok(proj): return proj # 5. gcloud config get-value project — usually set by anyone # who's run ``gcloud auth login``. Subprocess so this only # runs once per discover() call. proj = _gcloud_default_project() if _ok(proj): return proj return ""
[docs] def discover(self, *, overwrite: bool = False) -> list[str]: """Scan the environment for credentials and register any found. Lookups, in order — each one that produces a credential gets added under a stable name: ===================================================== ================= Source Registered as ===================================================== ================= ``$GOOGLE_APPLICATION_CREDENTIALS`` (JSON path) ``"adc"`` ``~/.config/earthengine/credentials`` (EE persistent) ``"ee-persistent"`` gcloud ADC well-known file ``"adc-default"`` ``$GEE_SERVICE_ACCOUNT_B64`` (legacy default SA) ``"env-default"`` ``$GEE_<NAME>_SERVICE_ACCOUNT`` (per-tenant SA keys) ``<name>`` ``$GEE_<NAME>_SA_EMAIL`` (per-tenant impersonation) ``<name>`` ``google.auth.default()`` fallback (Cloud Run / WIF) ``"adc"`` ===================================================== ================= The fallback only fires when nothing else registered, so keyed deployments aren't disturbed. On Cloud Run with an attached SA, on GKE with workload identity, or on AWS via Workload Identity Federation, this is the path that boots the proxy without any JSON key on disk. Returns the list of names actually registered by this call. Existing names are not overwritten unless ``overwrite=True``. Safe to call multiple times; failing sources are logged but don't raise — discovery is best-effort. """ added: list[str] = [] # OAuth credentials don't carry a project; SAs do. Detect a # project hint once and apply it only to OAuth entries (after # classification) so SAs keep their JSON-supplied project_id. oauth_project = self._detect_oauth_project() def _try_add(value, name: str) -> None: if not value: return if not overwrite and self.has(name): return try: self.addCreds(value, name=name) added.append(name) # Backfill project for OAuth entries only — SA entries # already have the correct project from their JSON. entry = self._entries.get(name) if (entry is not None and entry.type == "oauth" and not entry.project_id and oauth_project): entry.project_id = oauth_project logger.info( "eeCreds.discover: backfilled project %r for " "OAuth entry %r", oauth_project, name, ) except Exception: logger.exception( "eeCreds.discover: failed to register %s as %r", type(value).__name__, name, ) # 1. GOOGLE_APPLICATION_CREDENTIALS (path to JSON — could be SA or # OAuth depending on the file). _try_add does the right thing # per-type once addCreds has classified the file. adc_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS", "").strip() if adc_path and os.path.isfile(os.path.expanduser(adc_path)): _try_add(adc_path, "adc") # 2. Persistent EE credentials file (``earthengine authenticate``). # Always OAuth — the project backfill in _try_add prevents the # classic 403 from EE routing personal-account calls to # earthengine-legacy as the consumer. Files that exist but # don't contain a refresh_token (e.g. ``earthengine # set_project`` written before the user re-authenticated) are # silently skipped by ``addCreds`` via the new # no-refresh-token check below. try: import ee.oauth as _ee_oauth ee_path = _ee_oauth.get_credentials_path() if ee_path and os.path.isfile(ee_path): _try_add(ee_path, "ee-persistent") except Exception: logger.debug( "eeCreds.discover: ee.oauth.get_credentials_path unavailable", exc_info=True, ) # 2b. gcloud Application Default Credentials. When a user has # run ``gcloud auth application-default login``, an OAuth # refresh token lives at a well-known path independent of # the EE one. Picking this up means ``Map.view()`` can spin # up the proxy in ADC-only environments (no # $GOOGLE_APPLICATION_CREDENTIALS, no EE-persistent file) # instead of falling back to the legacy direct-token URL. try: from google.auth import _cloud_sdk as _gauth_cloud_sdk adc_default_path = _gauth_cloud_sdk.get_application_default_credentials_path() if adc_default_path and os.path.isfile(adc_default_path): _try_add(adc_default_path, "adc-default") except Exception: logger.debug( "eeCreds.discover: gcloud ADC well-known file lookup failed", exc_info=True, ) # 3. Legacy default SA env var (used by the env-var registry). sa_b64 = os.environ.get("GEE_SERVICE_ACCOUNT_B64", "").strip() if sa_b64: _try_add(sa_b64, "env-default") # 4. Per-tenant SAs from GEE_<NAME>_SERVICE_ACCOUNT pattern. import re as _re_disc named_re = _re_disc.compile(r"^GEE_([A-Z0-9_]+)_SERVICE_ACCOUNT$") for key in sorted(os.environ): if key == "GEE_SERVICE_ACCOUNT_B64": continue m = named_re.match(key) if not m: continue tenant_name = m.group(1).lower() _try_add(os.environ[key], tenant_name) # 4b. Keyless per-tenant impersonation: GEE_<NAME>_SA_EMAIL=<email>. # Same shape as GEE_<NAME>_SERVICE_ACCOUNT but holds an SA # email instead of a base64 key. The runtime's ADC source # impersonates the target at token-mint time — no key # material in env or on disk. Optional companion # GEE_<NAME>_PROJECT=<id> overrides the quota project. email_re = _re_disc.compile(r"^GEE_([A-Z0-9_]+)_SA_EMAIL$") for key in sorted(os.environ): m = email_re.match(key) if not m: continue tenant_name = m.group(1).lower() target_email = os.environ[key].strip() if not target_email: continue if not overwrite and self.has(tenant_name): continue project_override = os.environ.get( f"GEE_{m.group(1)}_PROJECT", "", ).strip() or None try: self.addImpersonation( target_email, name=tenant_name, project=project_override, ) added.append(tenant_name) except Exception: logger.exception( "eeCreds.discover: failed to register %s as %r " "(impersonation)", key, tenant_name, ) # 5. Keyless ADC fallback. If discovery found nothing AND no # ``adc`` entry already exists, try ``google.auth.default()``. # This is the path for Cloud Run with attached SA, GKE # workload identity, and AWS via Workload Identity Federation # — environments where credentials come from the runtime, not # a JSON file. Registers as ``"adc"`` so the proxy has at # least one tenant to route through. if not added and not self.has("adc"): try: import google.auth _creds, default_project = google.auth.default() self.addADC(name="adc", project=default_project or None) added.append("adc") logger.info( "eeCreds.discover: registered ADC fallback " "(project=%s)", default_project or "<runtime>", ) except Exception: logger.debug( "eeCreds.discover: ADC fallback unavailable", exc_info=True, ) if added: logger.info( "eeCreds.discover: registered %d credential(s): %s", len(added), ", ".join(added), ) return added
# ─────────────── introspection ───────────────
[docs] def list(self) -> list[str]: """Return registered credential names in insertion order.""" return list(self._entries.keys())
[docs] def names(self) -> list[str]: """Alias for ``list()`` — also returns registered names.""" return self.list()
[docs] def current(self) -> str: """Return the currently active tenant name. Falls back to the first registered name if no explicit ``use()`` has been made yet and at least one credential is registered. Returns ``""`` if nothing's registered.""" active = CURRENT_TENANT.get() if active and active in self._entries: return active if self._entries: return next(iter(self._entries)) return ""
[docs] def has(self, name: str) -> bool: return name in self._entries
[docs] def info(self, name: Optional[str] = None) -> dict: """Inspect a registered credential without exposing the secret key material. Returns ``{name, type, project_id, client_email, source}``. ``name=None`` returns the currently active one.""" nm = name or self.current() if nm not in self._entries: raise KeyError(f"eeCreds: unknown credential {nm!r}") e = self._entries[nm] return { "name": e.name, "type": e.type, "source": e.source, "project_id": e.project_id, "client_email": e.client_email, }
# ─────────────── token minting ───────────────
[docs] def get_token( self, name: Optional[str] = None, force_refresh: bool = False ) -> dict: """Mint (or return cached) access token for ``name``. If no name is given, uses the currently active tenant. Returns ``{access_token, project_id, client_email, tenant}`` — same shape as ``geeViz.eeAuth.registry.SARegistry.get_token`` so ``build_proxy_router`` can accept either. """ nm = name or self.current() if not nm: raise KeyError("eeCreds: no credentials registered") entry = self._entries.get(nm) if entry is None: # Fall back to the first registered tenant — keeps things # forgiving when an unknown name is passed (callers can # check has() themselves if they want strict mode). fallback = next(iter(self._entries), "") if not fallback: raise KeyError(f"eeCreds: unknown credential {nm!r}") entry = self._entries[fallback] with entry._lock: if (entry._token and not force_refresh and time.time() - entry._token_fetched_at < _TOKEN_TTL_SEC): return entry._token self._mint(entry) return entry._token
def _mint(self, entry: _CredEntry) -> None: """Refresh ``entry``'s credentials and cache the token. Caller must hold ``entry._lock``.""" import google.auth.transport.requests import ee # for ee.oauth.SCOPES if entry._creds is None: entry._creds = self._build_credentials(entry) entry._creds.refresh(google.auth.transport.requests.Request()) entry._token = { "access_token": entry._creds.token, "project_id": entry.project_id, "client_email": entry.client_email, "tenant": entry.name, } entry._token_fetched_at = time.time() def _build_credentials(self, entry: _CredEntry): """Build the appropriate google.auth Credentials object for this entry. Lazily, so ``ee`` / ``google.auth`` are only imported on first use.""" import ee # for ee.oauth.SCOPES scopes = ee.oauth.SCOPES if entry.type == "sa": import google.oauth2.service_account as gsa return gsa.Credentials.from_service_account_info( entry.data, scopes=scopes, ) if entry.type == "oauth": import google.oauth2.credentials as gco d = entry.data # Older ``earthengine authenticate`` credentials files contain # only a ``refresh_token`` (sometimes with scopes). The EE # Python library knows to inject its own well-known OAuth # client_id/client_secret when refreshing these — but # ``google.oauth2.credentials.Credentials.refresh`` will # fail with "The credentials do not contain the necessary # fields" unless we provide them up front. Fall back to # EE's well-known client when the JSON doesn't carry one. client_id = d.get("client_id") or ee.oauth.CLIENT_ID client_secret = d.get("client_secret") or ee.oauth.CLIENT_SECRET token_uri = d.get("token_uri") or ee.oauth.TOKEN_URI return gco.Credentials( token=None, refresh_token=d.get("refresh_token"), client_id=client_id, client_secret=client_secret, token_uri=token_uri, scopes=scopes, ) if entry.type == "adc": # Application Default Credentials. No key material in the # entry; tokens come from whatever ``google.auth.default()`` # resolves to on this host: # - Cloud Run / GCE / GKE: attached SA via metadata server # - AWS EC2/ECS/EKS with WIF: federated via STS, optionally # auto-impersonating a target SA (when the WIF config # names ``service_account_impersonation_url``) # - Local dev: ``gcloud auth application-default login`` # The proxy treats this identically to any other entry — # the resulting Credentials object refreshes the same way. import google.auth creds, default_project = google.auth.default(scopes=scopes) # Quota project — honor entry override; otherwise let ADC's # own resolution stand. ``with_quota_project`` is supported # on every Credentials subclass google-auth ships. proj = entry.project_id or default_project if proj and hasattr(creds, "with_quota_project"): try: creds = creds.with_quota_project(proj) except Exception: # ``AnonymousCredentials`` and a few exotic types # don't support quota project. Fall through. pass return creds if entry.type == "impersonated": # Service-account impersonation. Source identity comes from # ADC (the runtime's own identity — attached SA on GCP, WIF # federation on AWS, gcloud user creds locally). The source # principal must hold ``roles/iam.serviceAccountTokenCreator`` # on the target SA. The IAM credentials API mints a # short-lived (default 1h) access token for the target; # google-auth handles refresh automatically. import google.auth from google.auth import impersonated_credentials target_email = (entry.client_email or entry.data.get("client_email", "")) if not target_email: raise ValueError( f"eeCreds: impersonated entry {entry.name!r} has no " "target SA email (set client_email or use " "addImpersonation())." ) source_creds, _ = google.auth.default() return impersonated_credentials.Credentials( source_credentials=source_creds, target_principal=target_email, target_scopes=list(scopes), lifetime=3600, ) raise ValueError(f"eeCreds: unknown credential type {entry.type!r}") # ─────────────── switching ───────────────
[docs] def use(self, name: str): """Switch the active credential and return a context manager that restores the previous one on exit. Works as a statement OR a ``with`` block:: eeCreds.use("acme") # switch and forget ee.Image(1).getInfo() # uses acme with eeCreds.use("ian"): # scoped ee.Image(2).getInfo() # uses ian # back to acme here """ if name not in self._entries: raise KeyError( f"eeCreds: unknown credential {name!r} " f"(known: {self.list()})" ) # Allow both: # eeCreds.use("acme") — plain call, no with # with eeCreds.use("ian"): ... — context manager return _UseContext(self, name)
# ─────────────── lifecycle ───────────────
[docs] def start( self, *, proxy_port: int = _DEFAULT_PROXY_PORT, proxy_host: str = "127.0.0.1", ee_init: bool = True, launch_proxy: bool = True, ) -> dict: """Initialize Earth Engine for multi-credential use. Steps (each can be disabled via kwargs): 1. ``launch_proxy=True``: start a background HTTP proxy that injects per-tenant SA / OAuth tokens. Required for switching credentials at runtime without re-initializing ``ee``. 2. ``ee_init=True``: call ``ee.Initialize(url=proxy_url, ...)`` so the EE Python SDK routes all REST calls through the proxy. Returns a status dict with ``{started, proxy_url, tenants, ee_initialized}`` for inspection. Idempotent — calling ``start()`` twice is safe; the second call returns the current state. """ with self._lock: if self._started: return self._status() if not self._entries: raise RuntimeError( "eeCreds.start(): no credentials registered — " "call addCreds(...) first" ) if launch_proxy: self._launch_proxy(proxy_host, proxy_port) if ee_init: proxy_url = self._proxy_url or "" if proxy_url: # Pass the first registered credential's project so EE # builds API URLs like ``projects/<real>/value:compute`` # instead of the placeholder. The proxy then doesn't have # to rewrite paths — all tenants in a single eeCreds # instance share a process-wide ee.Initialize, and EE # rejects the placeholder string at the path level. first = next(iter(self._entries.values())) initialize_via_proxy( proxy_url, project=first.project_id or None, ) else: # No proxy → direct ee.Initialize with the FIRST creds # (single-tenant mode; .use() will then raise unless # the user re-initializes). first = next(iter(self._entries.values())) self._direct_init(first) self._started = True return self._status()
[docs] def stop(self) -> None: """Shut down the in-process proxy if one's running. Safe to call when not started.""" with self._lock: srv = self._proxy_server if srv is not None: try: srv.should_exit = True # uvicorn graceful shutdown except Exception: pass if self._proxy_thread is not None: self._proxy_thread.join(timeout=5.0) self._proxy_server = None self._proxy_thread = None self._proxy_url = None self._started = False
def _find_free_port(self, host: str, preferred: int) -> int: """Return ``preferred`` if it's bindable, otherwise the first free port walking up from ``preferred+1`` (up to ``+49``). Falls back to an OS-assigned ephemeral port if all 50 candidates in the explicit range are taken — never raises, always returns SOMETHING the caller can bind. Why this matters: the default proxy port (8889) is often in use from a previous Python process whose proxy thread didn't get cleaned up (notebook kernel restart, crashed script, two scripts running side by side). Without this, ``start()`` would silently hand back a ``_proxy_url`` pointing at a dead socket. There's a small TOCTOU race here — between our probe-and-close and uvicorn's actual bind, something else could grab the port. In practice this almost never happens; if it does, the user sees the original WinError 10048 and can rerun. """ import socket candidates = [preferred] + list(range(preferred + 1, preferred + 50)) for candidate in candidates: try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((host, candidate)) return candidate except OSError: continue # Everything in the explicit range was taken — let the OS pick # any free ephemeral port. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((host, 0)) return s.getsockname()[1] def _launch_proxy(self, host: str, port: int) -> None: """Start a uvicorn server in a background thread serving the proxy router. Captures the URL so ``ee.Initialize`` can point at it. If the requested ``port`` is already in use (common when a prior notebook kernel's proxy thread is still alive), this method transparently picks a different free port — the resulting ``proxy_url`` reflects whatever port was actually bound. """ try: import uvicorn except ImportError as e: raise RuntimeError( "eeCreds.start(launch_proxy=True) requires uvicorn. " "Install it (pip install uvicorn) or call " "start(launch_proxy=False) to skip." ) from e # Late import to dodge circular: server.py imports from registry, # we import from server here. from .server import create_proxy_app # Find a port we can actually bind. Most of the time this is # the preferred port; on conflicts we walk up. actual_port = self._find_free_port(host, port) if actual_port != port: logger.info( "eeCreds: port %d busy, using %d instead", port, actual_port ) app = create_proxy_app(creds=self, prefix="/ee-api") config = uvicorn.Config( app, host=host, port=actual_port, log_level="warning", access_log=False, ) server = uvicorn.Server(config) # Start the server loop in a daemon thread. Uvicorn drives its # own asyncio loop internally — fine for daemonized use. def _run(): try: server.run() except Exception: logger.exception("eeCreds: proxy thread died") t = threading.Thread(target=_run, name="eeCreds-proxy", daemon=True) t.start() # Brief wait for the socket to bind so first EE call doesn't race for _ in range(50): # up to ~5s if server.started: break time.sleep(0.1) else: logger.warning( "eeCreds: proxy thread didn't report 'started' within 5s; " "first EE call may need a retry" ) self._proxy_server = server self._proxy_thread = t self._proxy_url = f"http://{host}:{actual_port}/ee-api" logger.info("eeCreds: proxy listening at %s", self._proxy_url) def _direct_init(self, entry: _CredEntry) -> None: """Fallback ``launch_proxy=False`` path: call ``ee.Initialize`` directly with the first registered credential. Single-tenant only — ``.use()`` won't switch credentials without re-init.""" import ee if entry._creds is None: entry._creds = self._build_credentials(entry) ee.Initialize( credentials=entry._creds, project=entry.project_id or "ee-direct-init", ) logger.info( "eeCreds: ee.Initialize direct (no proxy) with %r", entry.name )
[docs] def router(self, **kwargs): """Return a FastAPI ``APIRouter`` that proxies EE requests using these credentials. Mount it in your own FastAPI app:: app.include_router(eeCreds.router(), prefix="/ee-api") ``kwargs`` pass through to ``build_proxy_router`` — customize the tenant header, resolver, workload-tag builder, etc. """ from .server import build_proxy_router return build_proxy_router(creds=self, **kwargs)
# ─────────────── detached-subprocess proxy ─────────────── # Lets a script start a proxy once, exit cleanly, and have any # subsequent script attach to the same process — survives the # caller's lifecycle so multi-``Map.view()`` workflows don't need # the blocking ``input()`` at the end of each script. See # ``ensure_started(mode="detached")``. @staticmethod def _detached_state_path() -> str: """Path to the JSON state file describing the running detached proxy. Single shared file per machine (one detached proxy at a time).""" import tempfile return os.path.join(tempfile.gettempdir(), ".geeViz_eeauth_proxy.json") @classmethod def _read_detached_state(cls) -> Optional[dict]: """Return the parsed state file contents, or ``None`` when the file is missing or unreadable.""" path = cls._detached_state_path() if not os.path.isfile(path): return None try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception: return None @classmethod def _write_detached_state(cls, state: dict) -> None: path = cls._detached_state_path() try: with open(path, "w", encoding="utf-8") as f: json.dump(state, f, indent=2) except Exception: logger.exception("eeCreds: failed writing detached state %s", path) @classmethod def _clear_detached_state(cls) -> None: path = cls._detached_state_path() try: if os.path.isfile(path): os.remove(path) except OSError: pass @staticmethod def _pid_alive(pid: int) -> bool: """Cross-platform: is process ``pid`` currently alive? Critical Windows note: ``os.kill(pid, 0)`` on Windows does NOT check liveness — it calls ``TerminateProcess(pid, 0)``, which actually KILLS the process. A previous version of this code used ``os.kill(pid, 0)`` and was silently murdering the detached proxy on every liveness check. Always use the Win32 API ``OpenProcess`` + ``GetExitCodeProcess`` on Windows. """ if pid <= 0: return False if os.name == "nt": import ctypes kernel32 = ctypes.windll.kernel32 PROCESS_QUERY_LIMITED_INFORMATION = 0x1000 STILL_ACTIVE = 259 handle = kernel32.OpenProcess( PROCESS_QUERY_LIMITED_INFORMATION, False, pid, ) if not handle: return False try: exit_code = ctypes.c_ulong() ok = kernel32.GetExitCodeProcess( handle, ctypes.byref(exit_code), ) if not ok: return False return exit_code.value == STILL_ACTIVE finally: kernel32.CloseHandle(handle) try: os.kill(pid, 0) return True except (ProcessLookupError, PermissionError): return False except OSError: return False def _tenant_fingerprint(self) -> str: """sha256(16) over the sorted tenant names — same hash the proxy's ``/health`` endpoint computes. When this drifts from what the running proxy reports, we know the env has new SA env vars that aren't in the proxy yet, and we restart it.""" import hashlib names = sorted(self._entries.keys()) return hashlib.sha256(",".join(names).encode("utf-8")).hexdigest()[:16] @classmethod def _probe_detached_health(cls, url: str, timeout: float = 2.0) -> Optional[dict]: """GET ``<url>/health`` and return the parsed JSON, or ``None`` on any failure (down, timeout, non-200, JSON parse error).""" try: import urllib.request as _urlreq except Exception: return None try: with _urlreq.urlopen(url.rstrip("/") + "/health", timeout=timeout) as resp: if getattr(resp, "status", 200) != 200: return None return json.loads(resp.read().decode("utf-8")) except Exception: return None @classmethod def _kill_detached(cls, state: dict, *, timeout: float = 5.0) -> None: """Terminate the detached proxy referenced by ``state``. SIGTERM first, wait up to ``timeout`` seconds for graceful exit, then SIGKILL (or TerminateProcess on Windows).""" pid = int(state.get("pid", 0) or 0) if pid <= 0 or pid == os.getpid(): return if not cls._pid_alive(pid): return try: if os.name == "nt": # Windows: send Ctrl+Break to the process group, then # TerminateProcess as the kill fallback. import signal as _sig try: os.kill(pid, _sig.CTRL_BREAK_EVENT) except (AttributeError, OSError): pass else: import signal as _sig os.kill(pid, _sig.SIGTERM) except Exception: pass deadline = time.time() + timeout while time.time() < deadline: if not cls._pid_alive(pid): return time.sleep(0.2) # Still alive — hard kill. try: if os.name == "nt": import ctypes kernel32 = ctypes.windll.kernel32 PROCESS_TERMINATE = 1 handle = kernel32.OpenProcess(PROCESS_TERMINATE, False, pid) if handle: kernel32.TerminateProcess(handle, 1) kernel32.CloseHandle(handle) else: import signal as _sig os.kill(pid, _sig.SIGKILL) except Exception: pass def _spawn_detached(self, port: int) -> dict: """Spawn ``python -m geeViz.eeAuth`` as a detached background subprocess listening on ``port``. Inherits this process's env (so the same ``GEE_*`` discovery rules apply) but detaches stdin/stdout/stderr and runs in its own process group so the spawning script can exit without taking the proxy down. Waits up to ~15s for ``/health`` to respond before returning. Writes the state file on success. Raises on failure.""" import subprocess as _sp url = f"http://127.0.0.1:{port}/ee-api" # The standalone runner uses --prefix /ee-api by default # (same as ``router(...)`` consumers) so the /health probe is # at ``<url>/health``. The runner reads SA tenants from env vars # via ``discover()`` — which is exactly what this parent # process did, so the child sees the same tenants. # Core command — same on all platforms. target_args = [ sys.executable, "-m", "geeViz.eeAuth", "--host", "127.0.0.1", "--port", str(port), "--prefix", "/ee-api", "--log-level", "WARNING", ] kwargs: dict = { "stdin": _sp.DEVNULL, "stdout": _sp.DEVNULL, "stderr": _sp.DEVNULL, "close_fds": True, } if os.name == "nt": # Windows Job Object problem: when the parent process is in # a Job with KILL_ON_JOB_CLOSE (VS Code's terminal, many # CI runners, GitHub Actions, etc.), the child inherits the # Job and gets killed ~seconds after the parent exits. The # ``CREATE_BREAKAWAY_FROM_JOB`` Popen flag only works when # the Job permits breakaway — VS Code's doesn't. # # Workaround: launch through ``cmd /c start /B`` so the # process is created by ``cmd``, then ``cmd`` exits, leaving # the launched process as an orphan owned by the system. # ``/B`` = no new window. The empty ``""`` first arg to # ``start`` is the (ignored) window title — required when # the executable path contains spaces. quoted = " ".join(f'"{a}"' if " " in a else a for a in target_args) args = ["cmd", "/c", "start", "", "/B", *target_args] # ``CREATE_NO_WINDOW`` (0x08000000) suppresses the brief cmd # window flash. Mutually exclusive with ``DETACHED_PROCESS`` # (which can ALLOCATE a fresh console for the child if the # parent has none) — pick CREATE_NO_WINDOW since we just # don't want any window, ever. kwargs["creationflags"] = ( 0x08000000 # CREATE_NO_WINDOW | getattr(_sp, "CREATE_NEW_PROCESS_GROUP", 0x00000200) | getattr(_sp, "CREATE_BREAKAWAY_FROM_JOB", 0x01000000) ) else: args = target_args # New session = own process group + detached from parent's # controlling terminal. Survives parent exit on POSIX. kwargs["start_new_session"] = True try: proc = _sp.Popen(args, **kwargs) except Exception as e: raise RuntimeError( f"eeCreds: failed to spawn detached proxy on port {port}: {e}" ) from e # On Windows the cmd-wrapped Popen returns the PID of cmd.exe, # which exits immediately after dispatching. We need to track # the actual proxy python.exe PID. Discover it by health-probing # /health (which returns ``pid``) once it's up. # Wait for /health to come up. The child takes ~1-3s to import # everything and bind the socket; if it's not responsive after # ~15s something is wrong (port conflict, env error, etc.). deadline = time.time() + 15.0 health = None # On Windows the launcher (cmd.exe) exits within ~50ms — that's # not a failure, it's the intended detach. Only treat # ``proc.poll() not None`` as fatal on POSIX where the Popen # PID IS the proxy. On Windows we rely on /health responding # to confirm the proxy actually started; if it doesn't, we # raise after the timeout. is_launcher = (os.name == "nt") while time.time() < deadline: if not is_launcher and proc.poll() is not None: raise RuntimeError( f"eeCreds: detached proxy exited prematurely with code " f"{proc.returncode}. Run " f"`python -m geeViz.eeAuth --port {port}` directly to see " f"the error." ) health = self._probe_detached_health(url, timeout=1.0) if health is not None: break time.sleep(0.3) if health is None: raise RuntimeError( f"eeCreds: detached proxy on port {port} didn't respond to " f"/health within 15s. Run " f"`python -m geeViz.eeAuth --port {port}` directly to see " f"the error." ) # Use the PID from /health — on Windows the Popen PID was the # cmd-launcher (already exited), not the actual proxy process. actual_pid = int(health.get("pid", 0) or proc.pid) state = { "pid": actual_pid, "port": port, "url": url, "version": health.get("version", ""), "tenant_fingerprint": health.get("tenant_fingerprint", ""), "started_at": health.get("started_at", ""), "python": sys.executable, } self._write_detached_state(state) logger.info( "eeCreds: spawned detached proxy pid=%s url=%s tenants=%s", proc.pid, url, health.get("tenants", []), ) return state def _ensure_detached(self, proxy_port: int) -> dict: """``ensure_started(mode='detached')`` core. Discover tenants, compute expected tenant fingerprint, look at the state file: - No state file → spawn fresh - Stale state (PID dead, port unresponsive, version mismatch, tenant-fingerprint mismatch) → kill it, spawn fresh - Healthy state with matching fingerprint → attach """ # Run discovery in THIS process first so we know what tenants # SHOULD be loaded. The child process will run its own # discover() at startup and (assuming the env is identical) # arrive at the same fingerprint. Discrepancy means the # detached process is stale — restart it. if not self._entries: try: self.discover() except Exception: logger.exception("eeCreds.ensure_detached: discovery failed") expected_fp = self._tenant_fingerprint() try: from geeViz import __version__ as _our_version except Exception: _our_version = "" # Inspect any existing detached proxy. state = self._read_detached_state() if state is not None: stale_reason = None pid = int(state.get("pid", 0) or 0) url = state.get("url") or "" if not self._pid_alive(pid): stale_reason = "pid not alive" elif not url: stale_reason = "no url in state" else: health = self._probe_detached_health(url) if health is None: stale_reason = "health probe failed" elif (state.get("version") or "") and _our_version \ and health.get("version") != _our_version: stale_reason = ( f"version mismatch (proxy={health.get('version')!r} " f"client={_our_version!r})" ) elif health.get("tenant_fingerprint") != expected_fp: stale_reason = ( f"tenant fingerprint mismatch " f"(proxy={health.get('tenant_fingerprint')!r} " f"expected={expected_fp!r})" ) if stale_reason is None: # Attach — set the inline-mode fields so callers that # read .proxy_url see the detached URL transparently. self._proxy_url = url logger.info( "eeCreds: attached to detached proxy %s pid=%s", url, pid, ) return { "proxy_url": url, "tenants": self.list(), "current": self.current(), "mode": "detached", "discovered": [], "attached": True, "pid": pid, } logger.info( "eeCreds: detached proxy stale (%s); replacing", stale_reason, ) self._kill_detached(state) self._clear_detached_state() # No usable existing proxy — spawn a new one. new_state = self._spawn_detached(proxy_port) self._proxy_url = new_state["url"] return { "proxy_url": new_state["url"], "tenants": self.list(), "current": self.current(), "mode": "detached", "discovered": [], "attached": False, "pid": new_state["pid"], }
[docs] @classmethod def stop_detached(cls) -> bool: """Public helper: kill the detached proxy (if any) and clear the state file. Returns ``True`` if a process was actually terminated, ``False`` if there was nothing to kill.""" state = cls._read_detached_state() if not state: return False cls._kill_detached(state) cls._clear_detached_state() return True
@property def proxy_url(self) -> Optional[str]: """URL of the in-process proxy (if one's running). Useful for embedding into iframe URLs / Map exports so the JS side uses the same proxy.""" return self._proxy_url
[docs] def sync_oauth_project(self, project: str) -> int: """Update every OAuth entry's ``project_id`` to ``project`` and invalidate the cached access token so the next mint includes the new project on the ``x-goog-user-project`` header. Used by ``robustInitializer`` after the legacy ``ee.Initialize`` fallback succeeds: discovery may have guessed a project the OAuth user can't access (e.g. ``gcloud config`` pointing at a service-account-owned project), but legacy init knows what ACTUALLY works. Syncing that back to the OAuth entries means subsequent ``Map.view()`` calls route through the proxy with the correct project instead of repeating the 403. Service-account entries are NOT touched — their project_id came from the SA JSON and is authoritative. Args: project: The known-good project ID. Returns: Number of entries actually updated. """ if not project or project.lower() == "earthengine-legacy": return 0 updated = 0 with self._lock: for entry in self._entries.values(): if entry.type != "oauth": continue if entry.project_id == project: continue logger.info( "eeCreds: syncing OAuth entry %r project from %r to %r", entry.name, entry.project_id or "<empty>", project, ) entry.project_id = project # Invalidate cached token so the next get_token() includes # the new project on the response. entry._token = {} entry._token_fetched_at = 0.0 updated += 1 return updated
# ─────────────── robust_init: the full bootstrap flow ───────────────
[docs] def robust_init(self, *, verbose: bool = False, interactive: bool = True) -> dict: """Best-effort EE initialization with the simplest possible UX. Decision tree (first hit wins, no prompts): 1. EE already initialized AND a test call works → return as-is. 2. eeAuth multi-tenant proxy via ``ensure_started`` → use it. 3. ``ee.Initialize()`` with NO project arg → let EE's own resolution chain (credentials' ``quota_project_id`` → ADC → env vars) pick the project. This is the path that mirrors what ``ee.Initialize()`` would do if the user typed it themselves. 4. Fallback: ``ee.Authenticate(force=True, auth_mode='localhost')`` (interactive only) and re-run step 3. No project-id prompts. If a user wants a specific project they can call ``ee.Initialize(project='X')`` themselves before importing geeViz, or run ``earthengine set_project X`` / ``gcloud auth application-default set-quota-project X``. Returns a status dict:: {"ok": bool, "source": "already-initialized" | "eeauth-proxy" | "ee-auto-init" | "interactive-auth", "project": "..."} Raises ``RuntimeError`` when no path completes — e.g. non-interactive environment with no creds, or no quota project discoverable after a fresh authenticate. Args: verbose: Print progress to stdout. interactive: If False, skip the ``ee.Authenticate()`` fallback and raise instead. Useful for daemons / CI where blocking on a browser would hang. """ import ee def _verify_and_return_project() -> str: ee.Number(1).getInfo() return ee.data._get_state().cloud_api_user_project or "" # 1. Already initialized + working? try: proj = _verify_and_return_project() if verbose: print( f"geeViz: EE already initialized (project={proj!r})" ) return { "ok": True, "source": "already-initialized", "project": proj, } except Exception: pass # 2. eeAuth proxy path. Default ``detached`` so multi-``Map.view()`` # workflows and successive script runs reuse one long-lived proxy # process — no blocking ``input()`` at the end of each script, no # daemon-thread death dragging the browser tab down. Set # ``GEEVIZ_EEAUTH_MODE=auto`` to force the legacy in-process # daemon-thread proxy (one-shot, dies with the script). ``legacy`` # skips proxy startup entirely. mode = os.environ.get("GEEVIZ_EEAUTH_MODE", "detached").lower() if mode != "legacy": try: status = self.ensure_started(mode=mode) if status.get("proxy_url"): try: proj = _verify_and_return_project() if verbose: print( f"geeViz: initialized via eeAuth proxy " f"({status['proxy_url']}, " f"project={proj!r})" ) return { "ok": True, "source": "eeauth-proxy", "project": proj, "proxy_url": status["proxy_url"], } except Exception as e: if verbose: print( "geeViz: proxy started but EE call " f"failed; falling back: {e}" ) except RuntimeError: # mode='proxy' demanded the proxy and it couldn't start. raise except Exception as e: if verbose: print( f"geeViz: eeAuth proxy unavailable, " f"falling back: {e}" ) # 3a. Cached project from a previous prompt. When ``robust_init`` # prompted the user once (step 4 below), it cached the # project id alongside the EE credentials file. Reuse it # before falling through to a re-auth — otherwise every # subsequent call would burn another ``ee.Authenticate(force=True)`` # OAuth roundtrip and re-prompt the user (Sphinx imports # geeViz dozens of times during ``make html``). cached_project = self._read_cached_project() if cached_project: try: ee.Initialize(project=cached_project) proj = _verify_and_return_project() or cached_project if verbose: print( f"geeViz: EE initialized from cached project " f"(project={proj!r})" ) if self._entries: self.sync_oauth_project(proj) return {"ok": True, "source": "ee-init-cached-project", "project": proj} except Exception as e: if verbose: print( f"geeViz: cached project {cached_project!r} " f"failed: {e}" ) # 3b. ``ee.Initialize()`` with no project. EE walks its own # resolution chain (credentials.quota_project_id → ADC → # env vars). On this user's machine this is what makes # everything Just Work — they reported that bare # ``ee.Initialize()`` succeeds and picks ``rcr-gee-ops`` from # ADC, which is what we want to mirror. try: ee.Initialize() proj = _verify_and_return_project() if verbose: print( f"geeViz: EE initialized (project={proj!r})" ) if self._entries: self.sync_oauth_project(proj) return {"ok": True, "source": "ee-auto-init", "project": proj} except Exception as e: if verbose: print(f"geeViz: ee.Initialize() failed: {e}") init_err = e # 4. Last resort: force a fresh auth and try once more. if not interactive: raise RuntimeError( "geeViz: ee.Initialize() failed and running " "non-interactively. Run " "`ee.Authenticate()` + `ee.Initialize(project=...)` " "before importing geeViz, or set " "$GOOGLE_APPLICATION_CREDENTIALS." ) from init_err print( "geeViz: running ee.Authenticate(force=True, " "auth_mode='localhost') — a browser window should open." ) ee.Authenticate(force=True, auth_mode="localhost") try: ee.Initialize() proj = _verify_and_return_project() if verbose: print( f"geeViz: EE initialized after auth (project={proj!r})" ) if self._entries: self.sync_oauth_project(proj) return { "ok": True, "source": "interactive-auth", "project": proj, } except Exception as e: # Auth succeeded but no quota project came with the creds. # Restore the legacy ``simpleSetProject`` UX — prompt once, # cache the answer next to the EE credentials file so the # prompt only ever happens once per machine, and reuse on # subsequent runs. proj = self._prompt_for_project(verbose=verbose) if not proj: raise RuntimeError( f"geeViz: ee.Initialize() failed even after fresh " f"authentication: {e}\nNo quota project could be " "auto-resolved. Set one explicitly with one of:\n" " earthengine set_project YOUR_PROJECT\n" " gcloud auth application-default set-quota-project " "YOUR_PROJECT\n" " ee.Initialize(project='YOUR_PROJECT') " "# before importing geeViz" ) from e try: ee.Initialize(project=proj) except Exception as e2: raise RuntimeError( f"geeViz: ee.Initialize(project={proj!r}) failed: {e2}" ) from e2 if verbose: print( f"geeViz: EE initialized after auth + project " f"prompt (project={proj!r})" ) if self._entries: self.sync_oauth_project(proj) return { "ok": True, "source": "interactive-auth-prompted-project", "project": proj, }
@staticmethod def _project_cache_path() -> str: """Path to the ``<creds_path>.proj_id`` cache file used by ``_prompt_for_project`` / ``_read_cached_project``. Empty string when EE's oauth module can't tell us where to put it.""" try: import ee.oauth as _ee_oauth creds_path = _ee_oauth.get_credentials_path() except Exception: return "" return os.path.normpath(f"{creds_path}.proj_id") if creds_path else "" @classmethod def _read_cached_project(cls) -> str: """Read the cached project id if one exists. Used by step 3a of ``robust_init`` to short-circuit before any auth path — once the user has answered the prompt once, subsequent calls in the same env should never re-auth or re-prompt.""" cache = cls._project_cache_path() if not cache or not os.path.isfile(cache): return "" try: return open(cache, "r", encoding="utf-8").read().strip() except Exception: return "" @classmethod def _prompt_for_project(cls, *, verbose: bool = False) -> str: """Ask the user once for a GEE project ID and cache it next to the EE credentials file. Mirrors the legacy ``geeViz.geeView.simpleSetProject`` UX so workflows that used to depend on that one-time prompt (Sphinx docs build, etc.) keep working under the new ``robust_init`` orchestration. Returns the project id (or ``""`` if no stdin and no cache). """ # Reuse cached value when present (same path step 3a reads). cached = cls._read_cached_project() if cached: if verbose: print(f"geeViz: using cached project from {cls._project_cache_path()}") return cached cache = cls._project_cache_path() cache_dir = os.path.dirname(cache) if cache else "" if cache_dir and not os.path.exists(cache_dir): try: os.makedirs(cache_dir, exist_ok=True) except Exception: pass # Try to prompt. EOFError (no stdin / closed pipe / non-interactive # daemons) returns "" so the caller can decide whether to fail or # raise — never block waiting for input that can't arrive. try: entered = input("Please enter GEE project ID: ").strip() except (EOFError, OSError): return "" if not entered: return "" print(f"You entered: {entered}") if cache: try: with open(cache, "w", encoding="utf-8") as f: f.write(entered) except Exception: pass return entered # ─────────────── ensure_started: discover + start in one call ───────────────
[docs] def ensure_started( self, *, mode: str = "auto", proxy_port: int = _DEFAULT_PROXY_PORT, ) -> dict: """Idempotent "I want the proxy running, please" helper used by ``Map.view()`` and any other code that wants to ride the eeCreds proxy without forcing the user to call ``addCreds`` + ``start``. Modes: - ``"auto"``: try discovery + start (inline daemon thread). If anything fails, return a status dict with ``proxy_url=""`` — caller can fall back. - ``"proxy"``: try discovery + start (inline daemon thread). RAISE if nothing can be discovered or the proxy fails to bind. - ``"detached"``: attach to or spawn a long-lived background subprocess running ``python -m geeViz.eeAuth``. Survives the calling script's exit, so multi-``Map.view()`` workflows and successive script invocations all share one proxy without needing the blocking ``input()`` at the end of each. The subprocess is identified by a state file at ``<tmp>/.geeViz_eeauth_proxy.json``; clients verify version + tenant fingerprint via ``/health`` and respawn if anything drifted. - ``"legacy"``: do nothing. Returns immediately with ``""``. Returns ``{proxy_url, tenants, current, mode, discovered}``. ``proxy_url == ""`` means caller should fall back. """ m = (mode or "auto").lower() if m not in ("auto", "proxy", "detached", "legacy"): raise ValueError( f"eeCreds.ensure_started: mode must be auto/proxy/detached/" f"legacy, got {mode!r}" ) if m == "legacy": return { "proxy_url": "", "tenants": self.list(), "current": "", "mode": m, "discovered": [], } if m == "detached": return self._ensure_detached(proxy_port) if self._started and self._proxy_url: return self._status_for_ensure(m, discovered=[]) discovered: list[str] = [] if not self._entries: try: discovered = self.discover() except Exception: logger.exception("eeCreds.ensure_started: discovery failed") if not self._entries: if m == "proxy": raise RuntimeError( "eeCreds.ensure_started(mode='proxy'): no credentials " "could be auto-discovered. Either set " "GOOGLE_APPLICATION_CREDENTIALS, run " "`earthengine authenticate`, or call " "eeCreds.addCreds(...) manually before this." ) # mode='auto' → silent fallback return { "proxy_url": "", "tenants": [], "current": "", "mode": m, "discovered": discovered, } try: self.start(proxy_port=proxy_port) except Exception: logger.exception( "eeCreds.ensure_started: start() failed; falling back" ) if m == "proxy": raise return { "proxy_url": "", "tenants": self.list(), "current": self.current(), "mode": m, "discovered": discovered, } return self._status_for_ensure(m, discovered=discovered)
def _status_for_ensure(self, mode: str, discovered: list) -> dict: return { "proxy_url": self._proxy_url or "", "tenants": self.list(), "current": self.current(), "mode": mode, "discovered": discovered, } def _status(self) -> dict: return { "started": self._started, "proxy_url": self._proxy_url, "tenants": self.list(), "current": self.current(), }
class _UseContext: """Return value of ``eeCreds.use(name)``. Acts as both a plain statement (the side effect of switching is immediate) AND a context manager (restores the previous tenant on exit).""" def __init__(self, parent: EECreds, name: str): self._parent = parent self._name = name # Switch immediately — supports the no-with style self._token = set_tenant(name) self._used_as_context = False def __enter__(self): self._used_as_context = True # set_tenant was already called in __init__; the token captured # there is what we restore on exit. return self._parent def __exit__(self, exc_type, exc_val, exc_tb): reset_tenant(self._token) return False # don't swallow exceptions # ──────────────────────── robust_init helpers ──────────────────────── def _diagnose_ee_credentials() -> dict: """Best-effort: identify what creds EE will end up using, BEFORE ``ee.Initialize`` makes that decision silently. Returns ``{has_ee_refresh_token, ee_project, has_adc, adc_project}``. Why it matters: ``ee.data.get_persistent_credentials()`` falls through to ADC when the EE credentials file has no ``refresh_token``, and ``ee.Authenticate()`` short-circuits when those ADC creds are "valid enough" — meaning a user who deleted their EE token and expects a fresh auth flow gets gcloud creds + gcloud's quota_project used silently. This helper surfaces what's happening. """ out = { "has_ee_refresh_token": False, "ee_project": "", "has_adc": False, "adc_project": "", } try: import ee.oauth as _ee_oauth args = _ee_oauth.get_credentials_arguments() out["has_ee_refresh_token"] = bool(args.get("refresh_token")) # get_credentials_arguments() maps the JSON 'project' field # to 'quota_project_id' in its return value. out["ee_project"] = args.get("quota_project_id") or "" except (OSError, FileNotFoundError): pass except Exception: logger.debug( "eeCreds._diagnose: EE credentials file unreadable", exc_info=True, ) try: import google.auth import google.auth.exceptions try: creds, _proj = google.auth.default() out["has_adc"] = creds is not None except google.auth.exceptions.DefaultCredentialsError: pass except Exception: logger.debug( "eeCreds._diagnose: google.auth.default() unavailable", exc_info=True, ) try: import ee.oauth as _ee_oauth out["adc_project"] = _ee_oauth.get_appdefault_project() or "" except Exception: pass return out def _save_ee_project_to_credentials_file(project: str) -> None: """Persist ``project`` to the EE credentials JSON's ``project`` field so the next session picks it up without prompting. Matches ``earthengine set_project`` semantics — writes to the same file via ``ee.oauth.write_private_json`` (atomic, 0600 perms). Best-effort: failures are logged but don't raise. """ if not project: return try: import ee.oauth as _ee_oauth path = _ee_oauth.get_credentials_path() config: dict = {} if os.path.isfile(path): try: with open(path, "r", encoding="utf-8") as f: config = json.load(f) except (OSError, json.JSONDecodeError): config = {} config["project"] = project _ee_oauth.write_private_json(path, config) logger.info( "eeCreds: saved project %r to EE credentials file", project, ) except Exception as e: logger.warning( "eeCreds: could not persist project to EE credentials file: %s", e, ) # ──────────────────────── Module-level singleton ──────────────────────── # Users who want zero ceremony do ``from geeViz.eeAuth import eeCreds``. # Users who want their own isolated registry do # ``from geeViz.eeAuth.eeCreds import EECreds; my = EECreds()``. eeCreds = EECreds()