"""Multi-tenant Earth Engine service-account registry.Loads service-account credentials from env vars at startup and providesper-tenant token minting with caching. Used by the proxy server to pickwhich SA to authenticate as for each incoming request.Env-var convention:- ``GEE_SERVICE_ACCOUNT_B64`` — the default tenant (legacy name kept for backward compatibility).- ``GEE_<NAME>_SERVICE_ACCOUNT`` — additional tenants. The middle capture group becomes the tenant id, lowercased. So ``GEE_TRAINING_SERVICE_ACCOUNT`` registers as the ``training`` tenant.Each value is base64-encoded service-account JSON. To add a tenant:1. Create the SA, register it with Earth Engine.2. Base64-encode the JSON key file.3. Set ``GEE_<NAME>_SERVICE_ACCOUNT=<b64>`` in your env / deploy.Tokens are minted on demand and cached. The registry is thread-safe;concurrent requests for the same tenant share one in-flight refreshvia the lock."""from__future__importannotationsimportbase64importjsonimportloggingimportosimportreimportthreadingimporttimefromtypingimportOptionallogger=logging.getLogger(__name__)# SA tokens are good for ~1h; re-mint every 50 min so handoffs are smooth._TOKEN_TTL_SEC=50*60DEFAULT_TENANT="default"# ``GEE_<NAME>_SERVICE_ACCOUNT`` — NAME may contain letters/digits/underscores.# Excludes the legacy ``GEE_SERVICE_ACCOUNT_B64`` since that's handled# separately (we don't want it loaded as tenant ``""`` or ``b64``)._TENANT_ENV_RE=re.compile(r"^GEE_([A-Z0-9_]+)_SERVICE_ACCOUNT$")def_decode_sa_json(b64:str)->dict:"""Decode a base64-encoded service-account JSON blob. Raises if the content isn't valid base64 OR isn't valid JSON, so misconfigured env vars fail loudly at startup rather than silently."""raw=base64.b64decode(b64)returnjson.loads(raw)
[docs]classSARegistry:"""Per-tenant service-account credentials + cached access tokens."""def__init__(self)->None:self._sa_json:dict[str,dict]={}self._creds:dict[str,object]={}self._tokens:dict[str,dict]={}# tenant -> {data, fetched_at}self._lock=threading.Lock()self._load_from_env()def_load_from_env(self)->None:"""Scan ``os.environ`` for SA entries. Idempotent — safe to call multiple times; later calls re-read env (useful in tests)."""# Legacy default SA — keep working with existing deployments.default_b64=os.environ.get("GEE_SERVICE_ACCOUNT_B64","")ifdefault_b64:try:self._sa_json[DEFAULT_TENANT]=_decode_sa_json(default_b64)logger.info("SA registry: loaded default tenant (sa=%s, project=%s)",self._sa_json[DEFAULT_TENANT].get("client_email"),self._sa_json[DEFAULT_TENANT].get("project_id"),)exceptException:logger.exception("SA registry: failed to decode GEE_SERVICE_ACCOUNT_B64")# Named tenants. Scan in sorted order so logs are stable.forkeyinsorted(os.environ):ifkey=="GEE_SERVICE_ACCOUNT_B64":continuem=_TENANT_ENV_RE.match(key)ifnotm:continuetenant=m.group(1).lower()try:self._sa_json[tenant]=_decode_sa_json(os.environ[key])logger.info("SA registry: loaded tenant %r (sa=%s, project=%s)",tenant,self._sa_json[tenant].get("client_email"),self._sa_json[tenant].get("project_id"),)exceptException:logger.exception("SA registry: failed to decode %s for tenant %r",key,tenant,)
[docs]defresolve(self,tenant:Optional[str])->str:"""Pick the actual tenant to use. Unknown / missing → default. Returns ``""`` if neither the requested tenant nor a default is configured — callers should treat that as "registry not ready"."""iftenantandtenantinself._sa_json:returntenantifDEFAULT_TENANTinself._sa_json:returnDEFAULT_TENANTreturn""
[docs]defget_token(self,tenant:Optional[str],force_refresh:bool=False)->dict:"""Return ``{access_token, project_id, client_email, tenant}`` for the given tenant. Caches across calls; refresh-on-expire happens automatically. Raises ``KeyError`` if no tenant matches and no default is configured. """resolved=self.resolve(tenant)ifnotresolved:raiseKeyError(f"unknown tenant {tenant!r} and no default configured "f"(known tenants: {self.list_tenants()})")withself._lock:cached=self._tokens.get(resolved)if(cachedandnotforce_refreshandtime.time()-cached["fetched_at"]<_TOKEN_TTL_SEC):returncached["data"]# Mint / refreshdata=self._mint_locked(resolved)self._tokens[resolved]={"data":data,"fetched_at":time.time()}returndata
def_mint_locked(self,tenant:str)->dict:"""Refresh credentials and return token data. Caller must hold ``self._lock``. Imports are local so ``ee`` doesn't get pulled in until actually needed (keeps test startup fast)."""importgoogle.oauth2.service_accountimportgoogle.auth.transport.requestsimportee# for ee.oauth.SCOPESsa_json=self._sa_json[tenant]creds=self._creds.get(tenant)ifcredsisNone:creds=google.oauth2.service_account.Credentials.from_service_account_info(sa_json,scopes=ee.oauth.SCOPES,)self._creds[tenant]=credscreds.refresh(google.auth.transport.requests.Request())return{"access_token":creds.token,"project_id":sa_json.get("project_id",""),"client_email":sa_json.get("client_email",""),"tenant":tenant,}
# Module-level singleton — lazy so tests can construct their own_REGISTRY:Optional[SARegistry]=None
[docs]defget_registry()->SARegistry:"""Return the process-wide SA registry, constructing it lazily on first access."""global_REGISTRYif_REGISTRYisNone:_REGISTRY=SARegistry()return_REGISTRY
[docs]defreset_registry()->None:"""Clear the singleton — used by tests to re-load after env changes."""global_REGISTRY_REGISTRY=None