Source code for geeViz.eeAuth.server

"""FastAPI proxy for Earth Engine that injects per-tenant SA tokens.

The proxy receives requests from EE clients (browser JS, Python SDK
via ``geeViz.eeAuth.client``), looks up the requested tenant in the SA
registry, mints a token (cached), and forwards to the real EE endpoint
with the right ``Authorization`` and ``x-goog-user-project`` headers.

Two ways to use:

1. **Standalone**::

       python -m geeViz.eeAuth --port 8888

   or programmatically::

       from geeViz.eeAuth.server import create_proxy_app
       app = create_proxy_app()
       # serve with uvicorn / etc.

2. **Mounted in an existing FastAPI app**::

       from fastapi import FastAPI
       from geeViz.eeAuth.server import build_proxy_router

       app = FastAPI()
       app.include_router(build_proxy_router(), prefix="/ee-api")

Tenant routing — the proxy picks the SA in this order:

1. ``X-geeViz-Creds`` request header (server-side EE SDK; set by
   ``geeViz.eeAuth.client.TenantAwareHttp``).
2. ``?tenant=`` query string parameter (browser map iframes).
3. Default tenant (the registry's ``default`` entry, loaded from
   ``GEE_SERVICE_ACCOUNT_B64``).

Workload tagging — every POST is stamped with a workload tag
``ee-proxy__<tenant>`` in the query string for billing attribution.
Pass ``workload_tag_builder=...`` to ``build_proxy_router`` if you want
to construct your own tag (e.g. include user / session).
"""
from __future__ import annotations

import datetime
import logging
import os
from typing import Callable, Optional
from urllib.parse import parse_qsl, urlencode

# Module-load timestamp — used by the /health probe so detached-mode
# clients can tell how stale a discovered proxy process is.
_PROCESS_STARTED_AT = datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds")

from fastapi import APIRouter, FastAPI, Request, Response

from .registry import get_registry
from .tags import build_workload_tag

logger = logging.getLogger(__name__)

# Default upstream — EE serves compute + maps from content-earthengine.
# value:compute also works at earthengine.googleapis.com, but
# content-earthengine accepts both, so we route everything there.
DEFAULT_UPSTREAM = "https://content-earthengine.googleapis.com"

# Header the proxy expects for routing. Default is geeViz-branded so it's
# obviously library-owned in browser DevTools / packet captures; override
# per-deployment via ``build_proxy_router(tenant_header=...)``. The agent
# uses ``X-AskTerra-Tenant`` for back-compat with iframe URLs already in
# production. Both sides (client transport + proxy router) must use the
# SAME value — the library's defaults match by convention.
DEFAULT_TENANT_HEADER = "X-geeViz-Creds"


# Headers we never forward to upstream — they're either hop-by-hop, leak
# our infrastructure (IAP, forwarding proxies), or are our own internal
# routing signals that EE would reject.
_STRIPPED_HEADERS = frozenset({
    "host", "content-length", "authorization",
    "x-forwarded-for", "x-forwarded-proto", "x-forwarded-host", "x-real-ip",
    "x-goog-authenticated-user-email", "x-goog-authenticated-user-id",
    "x-goog-iap-jwt-assertion",
    # Stripped because the server sets its own — what the client claims is irrelevant.
    "x-goog-user-project",
})


def _default_tenant_resolver(
    request: Request, tenant_header: str
) -> str:
    """Read tenant from header, then query param. Returns ``""`` if
    neither present — the registry's default tenant will be used."""
    t = request.headers.get(tenant_header, "").strip().lower()
    if t:
        return t
    return (request.query_params.get("tenant", "") or "").strip().lower()


def _default_workload_tag_builder(
    request: Request, tenant: str
) -> str:
    """Build a simple workload tag from the tenant. Override for richer
    attribution (e.g. include user / session from your own headers)."""
    parts = ["ee-proxy"]
    if tenant:
        parts.append(tenant)
    return build_workload_tag(*parts)


def _rewrite_query_with_workload_tag(
    query: str,
    tenant: str,
    workload_tag_builder: Callable[[Request, str], str],
    request: Request,
    tenant_query_param: str,
) -> str:
    """Strip any client-set workloadTag and tenant query param; add our
    own workload tag if a tag builder produced one."""
    try:
        tag = workload_tag_builder(request, tenant)
    except Exception:
        logger.exception("ee-proxy: workload_tag_builder failed")
        tag = ""
    pairs = [
        (k, v) for k, v in parse_qsl(query or "", keep_blank_values=True)
        if k != "workloadTag" and k != tenant_query_param
    ]
    if tag:
        pairs.append(("workloadTag", tag))
    return urlencode(pairs)


[docs] def build_proxy_router( creds=None, upstream: str = DEFAULT_UPSTREAM, tenant_header: str = DEFAULT_TENANT_HEADER, tenant_query_param: str = "tenant", tenant_resolver: Optional[Callable[[Request, str], str]] = None, workload_tag_builder: Optional[Callable[[Request, str], str]] = None, ) -> APIRouter: """Build a FastAPI ``APIRouter`` that handles ``{path:path}`` and proxies every request to ``upstream`` with the right SA token. Args: creds: Object exposing ``get_token(tenant, force_refresh=False) -> {access_token, project_id, tenant, ...}``. Accepts an :class:`EECreds` instance, an :class:`SARegistry`, or any other object with the same interface. ``None`` (default) uses the process-wide env-var registry (legacy). upstream: Base URL of the real EE API. ``content-earthengine.googleapis.com`` works for both maps and compute. ``earthengine.googleapis.com`` is also accepted for most endpoints. tenant_header: Header name to read for routing. Default ``X-geeViz-Creds``. Must match the client side. tenant_query_param: Query string key to read for tenant routing (browser iframe pattern). Default ``"tenant"``. Stripped from the outbound URL so EE never sees it. tenant_resolver: Custom function ``(request) -> str`` to pick the tenant. Override for richer auth schemes (e.g. resolve via IAP email lookup). Default reads ``tenant_header`` then ``tenant_query_param``. workload_tag_builder: Custom function ``(request, tenant) -> str`` that returns the workload tag for billing attribution. Returning ``""`` disables tagging on this request. Default builds ``ee-proxy__<tenant>``. Mount the returned router on whatever prefix you like — typically ``/ee-api``. """ upstream = upstream.rstrip("/") resolver = tenant_resolver or ( lambda r: _default_tenant_resolver(r, tenant_header) ) tag_builder = workload_tag_builder or _default_workload_tag_builder # Shared async HTTP client. Opening a new ``httpx.AsyncClient`` per # request — which the original code did — costs a fresh TLS handshake # to ``content-earthengine.googleapis.com`` on every EE call (50-150ms # round-trips that pile up fast when the map viewer fires N parallel # ``value:compute`` queries per layer). One shared client per router # keeps connections in a pool and reuses them. ``http2=True`` because # EE supports it and HTTP/2 multiplexing further reduces head-of-line # blocking for parallel requests on a single connection. import httpx as _httpx upstream_client = _httpx.AsyncClient( timeout=_httpx.Timeout(120.0, connect=10.0), follow_redirects=False, limits=_httpx.Limits( max_keepalive_connections=64, max_connections=128, keepalive_expiry=60.0, ), ) def _resolve_creds(): """Resolve the credential source for each request. Honours the ``creds`` argument when provided, else falls back to the env-var registry singleton — both expose ``get_token`` so the proxy code below doesn't care which is in use.""" if creds is not None: return creds return get_registry() router = APIRouter() @router.get("/health") async def health() -> dict: """Liveness + identity probe for detached-mode discovery. Returned fields: - ``ok`` — always true (request reached us) - ``version`` — geeViz package version (for version-skew detection in ``eeCreds._ensure_detached_proxy``) - ``tenant_fingerprint`` — sha256 of sorted tenant names, so clients can detect when the detached process is using a stale tenant set vs. the current environment - ``tenants`` — list of tenant names currently registered (mainly for human debugging via curl) - ``pid`` — process id of the proxy - ``started_at`` — ISO timestamp of process start """ import hashlib import os try: from geeViz import __version__ as _ver except Exception: _ver = "" src = _resolve_creds() names = [] try: if hasattr(src, "list"): names = list(src.list()) elif hasattr(src, "list_tenants"): names = list(src.list_tenants()) except Exception: names = [] names.sort() fp = hashlib.sha256(",".join(names).encode("utf-8")).hexdigest()[:16] return { "ok": True, "version": _ver, "tenant_fingerprint": fp, "tenants": names, "pid": os.getpid(), "started_at": _PROCESS_STARTED_AT, } @router.api_route( "/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"], ) async def ee_proxy(path: str, request: Request) -> Response: import httpx # 1. Resolve tenant + mint a token from the credential source. # # Path-prefix syntax ``/ee-api/t/<tenant>/<rest>`` wins over # header and query. ``Map.view()`` bakes the tenant into the # JS-side ``authProxyAPIURL`` exactly this way to pin each # browser tab to its load-time tenant, immune to process-wide # eeCreds switches in the host script. Strip the prefix so # only the genuine EE path is forwarded upstream. path_tenant = "" if path.startswith("t/"): rest = path[len("t/"):] slash = rest.find("/") if slash > 0: path_tenant = rest[:slash] path = rest[slash + 1:] else: # ``/ee-api/t/<tenant>`` with no trailing segment — # tenant-ack ping, no upstream call needed. return Response(content=b"", status_code=204) tenant = path_tenant or resolver(request) registry = _resolve_creds() try: tok = registry.get_token(tenant or None) except KeyError as e: return Response( content=f"tenant routing failed: {e}", status_code=400, ) except Exception as e: logger.exception("ee-proxy: token mint failed (tenant=%r)", tenant) return Response(content=f"auth mint failed: {e}", status_code=500) actual_tenant = tok.get("tenant", tenant or "default") access_token = tok["access_token"] quota_project = ( tok.get("project_id") or os.environ.get("GEE_PROJECT", "") ) # 2. Rewrite the query string: strip client-set workloadTag and # the internal tenant param; add our own workload tag on POSTs. # GET requests can't carry unknown query params on most EE # endpoints, so we just strip there without adding. if request.method == "POST": rewritten_query = _rewrite_query_with_workload_tag( request.url.query or "", actual_tenant, tag_builder, request, tenant_query_param, ) else: rewritten_query = urlencode([ (k, v) for k, v in parse_qsl( request.url.query or "", keep_blank_values=True ) if k != "workloadTag" and k != tenant_query_param ]) upstream_url = f"{upstream}/{path}" if rewritten_query: upstream_url = f"{upstream_url}?{rewritten_query}" # 3. Forward headers — strip hop-by-hop, auth, IAP, and our own # tenant routing header (must never leak to EE). stripped = set(_STRIPPED_HEADERS) stripped.add(tenant_header.lower()) fwd_headers = {} for k, v in request.headers.items(): if k.lower() in stripped: continue fwd_headers[k] = v fwd_headers["authorization"] = f"Bearer {access_token}" # ``$discovery/rest`` is the googleapiclient discovery doc. EE # itself strips quota-project on credentials before fetching it # (see ee._cloud_api_utils.build_cloud_resource) because the # serviceUsage API rejects discovery requests that carry a # consumer project. Mirror that here — without this, SAs that # otherwise work fine 403 on init. is_discovery = "$discovery/rest" in path if quota_project and not is_discovery: fwd_headers["x-goog-user-project"] = quota_project body = await request.body() # 4. Forward + retry once on 401 (token rotation). Uses the # shared ``upstream_client`` (keep-alive connection pool) — see # the construction above for why we don't create per-request. try: upstream_resp = await upstream_client.request( request.method, upstream_url, content=body if body else None, headers=fwd_headers, ) except httpx.HTTPError as e: logger.exception("ee-proxy: upstream error for %s %s", request.method, path) return Response(content=f"upstream error: {e}", status_code=502) if upstream_resp.status_code == 401: try: tok = registry.get_token(actual_tenant, force_refresh=True) fwd_headers["authorization"] = f"Bearer {tok['access_token']}" qp = (tok.get("project_id") or os.environ.get("GEE_PROJECT", "")) if qp and not is_discovery: fwd_headers["x-goog-user-project"] = qp upstream_resp = await upstream_client.request( request.method, upstream_url, content=body if body else None, headers=fwd_headers, ) except Exception: logger.exception("ee-proxy: retry after 401 failed") # 5. Pass through, stripping hop-by-hop and auth-related response # headers that are context-specific to the upstream. resp_headers = {} for k, v in upstream_resp.headers.items(): if k.lower() in ("content-encoding", "content-length", "transfer-encoding", "connection", "server"): continue resp_headers[k] = v return Response( content=upstream_resp.content, status_code=upstream_resp.status_code, headers=resp_headers, media_type=upstream_resp.headers.get("content-type"), ) return router
[docs] def create_proxy_app( creds=None, upstream: str = DEFAULT_UPSTREAM, tenant_header: str = DEFAULT_TENANT_HEADER, tenant_query_param: str = "tenant", tenant_resolver: Optional[Callable[[Request, str], str]] = None, workload_tag_builder: Optional[Callable[[Request, str], str]] = None, prefix: str = "/ee-api", serve_geeview: bool = True, ) -> FastAPI: """Build a standalone FastAPI app with the proxy mounted at ``prefix``. Suitable for direct serving via ``uvicorn`` or for testing. ``creds`` accepts an :class:`EECreds` / :class:`SARegistry`-like object; ``None`` falls back to the env-var registry. See :func:`build_proxy_router` for the other parameters. Use ``build_proxy_router`` directly if you want to mount in an existing FastAPI app and share its middleware / lifecycle. Args: serve_geeview: When True (default for standalone runs), also mount the geeView frontend bundle at ``/geeView/*``. This makes the detached proxy the single long-lived server for both EE auth (``/ee-api/*``) and ``Map.view()`` HTML (``/geeView/...``). Same origin, same port — browser tabs survive script exits without a daemon-thread server inside each script. Set False to keep the proxy auth-only. """ app = FastAPI(title="geeViz EE proxy") app.include_router( build_proxy_router( creds=creds, upstream=upstream, tenant_header=tenant_header, tenant_query_param=tenant_query_param, tenant_resolver=tenant_resolver, workload_tag_builder=workload_tag_builder, ), prefix=prefix, ) if serve_geeview: # Mount the geeViz package directory at /geeView. Map.view() # writes exports into ``<package>/geeView/src/gee/gee-run/`` — # the browser fetches them at ``/geeView/src/gee/gee-run/<file>`` # and all relative asset references (``src/lib/...``, # ``src/css/...``, ``src/gee/...``) resolve under the same # ``/geeView/`` root, matching what the legacy in-script # ``_GeeVizRequestHandler`` served. from fastapi.staticfiles import StaticFiles import os as _os _PKG_DIR = _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))) _GEEVIEW_DIR = _os.path.join(_PKG_DIR, "geeView") if _os.path.isdir(_GEEVIEW_DIR): app.mount( "/geeView", StaticFiles(directory=_GEEVIEW_DIR, html=True), name="geeview-static", ) def _list_tenants() -> list: if creds is None: return get_registry().list_tenants() # EECreds uses list() (insertion order); SARegistry uses list_tenants() if hasattr(creds, "list_tenants"): return creds.list_tenants() return creds.list() @app.get("/") def _root(): """Lightweight health check + tenant listing.""" return { "service": "geeViz.eeAuth proxy", "tenants_loaded": _list_tenants(), "mount_prefix": prefix, "upstream": upstream, "tenant_header": tenant_header, } return app