Source code for geeViz.mcp.server

"""
geeViz MCP Server -- execution and introspection tools for Earth Engine via geeViz.

Unlike static doc snippets, this server executes code, inspects live GEE assets,
and dynamically queries API signatures. 33 tools replace the previous 49.
"""
from __future__ import annotations

import os
import sys

# Help before any heavy imports so "python -m geeViz.mcp.server --help" works without the mcp package
if len(sys.argv) > 1 and sys.argv[1] in ("-h", "--help"):
    _help = """usage: python -m geeViz.mcp.server [--help]

geeViz MCP Server -- execution and introspection for Earth Engine via geeViz.

Options:
  -h, --help    Show this help and exit.

Environment (optional):
  MCP_TRANSPORT   Transport: "stdio" (default) or "streamable-http"
  MCP_HOST        Host for HTTP (default: 127.0.0.1)
  MCP_PORT        Port for HTTP (default: 8000)
  MCP_PATH        Path for HTTP (default: /mcp)

Tools (33):
  run_code                 Execute Python/GEE code in a persistent REPL namespace
  inspect_asset            Get metadata for any GEE asset
  get_api_reference        Look up function signatures and docstrings
  list_functions           List public functions in a geeViz module
  search_functions         Search across ALL geeViz modules for a function
  get_example              Read source code of a geeViz example script
  list_examples            List available example scripts
  list_assets              List assets in a GEE folder
  track_tasks              Get status of recent EE tasks
  view_map                 Open the geeView map and return the URL
  get_map_layers           See what layers are currently on the map
  clear_map                Clear all map layers and commands
  save_script              Save accumulated run_code history to a .py file
  get_version_info         Return geeViz, EE, and Python version info
  get_namespace            Inspect user-defined variables in the REPL
  get_project_info         Return current EE project ID and root assets
  save_notebook            Save run_code history as a Jupyter notebook
  export_to_asset          Export an ee.Image to a GEE asset (via geeViz wrapper)
  geocode                  Geocode a place name to coordinates / GEE boundaries
  search_datasets          Search the GEE dataset catalog by keyword
  get_dataset_info         Get detailed STAC metadata for a GEE dataset
  get_thumbnail            Get a PNG/GIF thumbnail of an ee.Image or ImageCollection
  export_to_drive          Export an ee.Image to Google Drive
  export_to_cloud_storage  Export an ee.Image to Google Cloud Storage
  cancel_tasks             Cancel running/ready EE tasks (all or by name)
  sample_values            Sample pixel values from an ee.Image at a point/region
  get_time_series          Extract band values over time from an ImageCollection
  delete_asset             Delete a single GEE asset
  copy_asset               Copy a GEE asset to a new location
  move_asset               Move a GEE asset (copy + delete source)
  create_folder            Create a GEE folder or ImageCollection
  update_acl               Update permissions (ACL) on a GEE asset
  get_collection_info      Get summary info for an ImageCollection

Examples:
  python -m geeViz.mcp.server
  python -m geeViz.mcp --help

See also: geeViz/mcp/README.md
"""
    print(_help, file=sys.stderr)
    sys.exit(0)

import importlib.util

# Path setup: ensure geeViz and package root are on path
_THIS_DIR = os.path.dirname(os.path.abspath(__file__))  # .../geeViz/mcp
_GEEVIZ_DIR = os.path.dirname(_THIS_DIR)               # .../geeViz
_PACKAGE_ROOT = os.path.dirname(_GEEVIZ_DIR)            # .../geeVizBuilder
_EXAMPLES_DIR = os.path.join(_GEEVIZ_DIR, "examples")
sys.path = [p for p in sys.path if not (p.rstrip(os.sep).endswith("mcp") and _GEEVIZ_DIR in (p or ""))]
if _PACKAGE_ROOT not in sys.path:
    sys.path.insert(0, _PACKAGE_ROOT)
if _GEEVIZ_DIR not in sys.path:
    sys.path.append(_GEEVIZ_DIR)


# Load FastMCP from the MCP SDK. When run as python -m geeViz.mcp.server, the name "mcp"
# resolves to this package (geeViz.mcp), so we load the SDK's fastmcp by file from site-packages.
# If mcp is not installed (e.g. during Sphinx doc build), a lightweight stub is used so the
# module can still be imported and @app.tool() decorators pass functions through unchanged.
def _load_fastmcp():
    import site as _site
    for _sp in _site.getsitepackages():
        _origin = os.path.join(_sp, "mcp", "server", "fastmcp.py")
        if os.path.isfile(_origin):
            spec = importlib.util.spec_from_file_location("_geeviz_mcp_sdk_fastmcp", _origin)
            if spec and spec.loader:
                mod = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(mod)
                return mod.FastMCP
    try:
        from mcp.server.fastmcp import FastMCP
        return FastMCP
    except ModuleNotFoundError:
        return None


class _StubFastMCP:
    """Lightweight stand-in when the mcp SDK is not installed.

    Makes @app.tool() a no-op passthrough so functions keep their real
    type and docstrings (important for Sphinx autodoc).
    """

    def __init__(self, *args, **kwargs):
        pass

    def tool(self):
        """Return identity decorator -- the function is unchanged."""
        def _identity(fn):
            return fn
        return _identity

    def resource(self, *args, **kwargs):
        def _identity(fn):
            return fn
        return _identity

    def run(self, **kwargs):
        raise RuntimeError("mcp SDK not installed; install with: pip install mcp")


_FastMCP = _load_fastmcp()
FastMCP = _FastMCP if _FastMCP is not None else _StubFastMCP


def _load_mcp_image():
    """Load the Image class from the mcp SDK for returning images from tools.

    IMPORTANT: Try the direct import first so we get the exact same Image class
    that FastMCP uses internally. If we load from file (as a standalone module),
    the class identity differs and FastMCP's isinstance() check fails, causing
    images to not display in clients like Cursor.
    """
    # Preferred: direct import matches FastMCP's own Image class
    try:
        from mcp.server.fastmcp.utilities.types import Image
        return Image
    except (ImportError, ModuleNotFoundError, AttributeError):
        pass
    # Fallback: load from file in site-packages (older SDK layouts)
    import site as _site
    for _sp in _site.getsitepackages():
        _types_path = os.path.join(_sp, "mcp", "server", "fastmcp", "utilities", "types.py")
        if os.path.isfile(_types_path):
            try:
                spec = importlib.util.spec_from_file_location("_geeviz_mcp_types", _types_path)
                if spec and spec.loader:
                    mod = importlib.util.module_from_spec(spec)
                    spec.loader.exec_module(mod)
                    cls = getattr(mod, "Image", None)
                    if cls is not None:
                        return cls
            except Exception:
                pass
    return None


_MCPImage = _load_mcp_image()

# Load agent instructions from the bundled markdown file.
# These are injected as the MCP server instructions (like a system prompt)
# so every connected client automatically knows how to use the tools.
_INSTRUCTIONS_FILE = os.path.join(_THIS_DIR, "agent-instructions.md")
try:
    with open(_INSTRUCTIONS_FILE, "r", encoding="utf-8") as _f:
        _SERVER_INSTRUCTIONS = _f.read()
except Exception:
    _SERVER_INSTRUCTIONS = None

app = FastMCP(
    "geeViz",
    instructions=_SERVER_INSTRUCTIONS,
    json_response=True,
) if _FastMCP is not None else _StubFastMCP()

# ---------------------------------------------------------------------------
# Lazy initialization -- defer all geeViz imports until first tool call
# that needs them.  Every geeViz module import triggers robustInitializer()
# at module level, so we must not import at top level.
# ---------------------------------------------------------------------------
import threading
import json

_init_lock = threading.Lock()
_initialized = False

# Module short-name -> fully qualified import path
_MODULE_MAP = {
    "geeView": "geeViz.geeView",
    "getImagesLib": "geeViz.getImagesLib",
    "changeDetectionLib": "geeViz.changeDetectionLib",
    "gee2Pandas": "geeViz.gee2Pandas",
    "assetManagerLib": "geeViz.assetManagerLib",
    "taskManagerLib": "geeViz.taskManagerLib",
    "foliumView": "geeViz.foliumView",
    "phEEnoViz": "geeViz.phEEnoViz",
    "cloudStorageManagerLib": "geeViz.cloudStorageManagerLib",
}

# Persistent REPL namespace for run_code
_namespace: dict = {}

# Code history for save_script
_code_history: list[str] = []
_script_dir = os.path.join(_THIS_DIR, "generated_scripts")
_current_script_path: str | None = None


def _ensure_initialized():
    """Lazy-initialize EE and populate the REPL namespace. Thread-safe."""
    global _initialized
    if _initialized:
        return
    with _init_lock:
        if _initialized:
            return
        import geeViz.geeView as gv
        import geeViz.getImagesLib as gil
        import ee

        _namespace.update({
            "ee": ee,
            "Map": gv.Map,
            "gv": gv,
            "gil": gil,
        })
        _initialized = True


def _reset_namespace():
    """Clear and re-populate the REPL namespace. Also resets code history."""
    global _initialized, _current_script_path
    _namespace.clear()
    _code_history.clear()
    _current_script_path = None
    _initialized = False
    _ensure_initialized()


def _save_history_to_file() -> str:
    """Write accumulated code history to a timestamped .py file. Returns the path."""
    global _current_script_path
    import datetime
    os.makedirs(_script_dir, exist_ok=True)
    if _current_script_path is None:
        ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        _current_script_path = os.path.join(_script_dir, f"session_{ts}.py")
    header = (
        "# Auto-generated by geeViz MCP server\n"
        "# Each section below is one run_code call, in order.\n\n"
        "import geeViz.geeView as gv\n"
        "import geeViz.getImagesLib as gil\n"
        "ee = gv.ee\n"
        "Map = gv.Map\n\n"
    )
    body = "\n\n".join(
        f"# --- run_code call {i+1} ---\n{block}"
        for i, block in enumerate(_code_history)
    )
    with open(_current_script_path, "w", encoding="utf-8") as f:
        f.write(header + body + "\n")
    return _current_script_path


# ---------------------------------------------------------------------------
# Tool 1: run_code
# ---------------------------------------------------------------------------
import ast
import io
import contextlib
import traceback


[docs] @app.tool() def run_code(code: str, timeout: int = 120, reset: bool = False) -> str: """Execute Python/GEE code in a persistent REPL namespace (like Jupyter). The namespace persists across calls -- variables set in one call are available in the next. Pre-populated with: ee, Map (gv.Map), gv (geeViz.geeView), gil (geeViz.getImagesLib). Args: code: Python code to execute. timeout: Max seconds to wait (default 120). On Windows a hung getInfo() cannot be force-killed; the thread continues in background. reset: If True, clear the namespace and re-initialize before executing. Returns: JSON with keys: success (bool), stdout, stderr, result, error. """ if reset: _reset_namespace() else: _ensure_initialized() stdout_buf = io.StringIO() stderr_buf = io.StringIO() result_holder: list = [None] error_holder: list = [None] def _exec(): try: with contextlib.redirect_stdout(stdout_buf), contextlib.redirect_stderr(stderr_buf): # Try to detect if the last statement is an expression tree = ast.parse(code) if tree.body and isinstance(tree.body[-1], ast.Expr): # Execute everything except the last statement if len(tree.body) > 1: mod = ast.Module(body=tree.body[:-1], type_ignores=[]) exec(compile(mod, "<mcp>", "exec"), _namespace) # Eval the last expression to capture its value expr = ast.Expression(body=tree.body[-1].value) result_holder[0] = eval(compile(expr, "<mcp>", "eval"), _namespace) else: exec(compile(code, "<mcp>", "exec"), _namespace) except Exception: error_holder[0] = traceback.format_exc() thread = threading.Thread(target=_exec, daemon=True) thread.start() thread.join(timeout=timeout) if thread.is_alive(): return json.dumps({ "success": False, "stdout": stdout_buf.getvalue(), "stderr": stderr_buf.getvalue(), "result": None, "error": f"Execution timed out after {timeout}s. " "Note: on Windows, the thread continues in background and cannot be force-killed.", }) if error_holder[0]: return json.dumps({ "success": False, "stdout": stdout_buf.getvalue(), "stderr": stderr_buf.getvalue(), "result": None, "error": error_holder[0], "script_path": None, }) # Success -- record in history and save to file _code_history.append(code) script_path = _save_history_to_file() result_val = result_holder[0] # Make result JSON-serializable result_str = None if result_val is not None: try: json.dumps(result_val) result_str = result_val except (TypeError, ValueError): result_str = repr(result_val) return json.dumps({ "success": True, "stdout": stdout_buf.getvalue(), "stderr": stderr_buf.getvalue(), "result": result_str, "error": None, "script_path": script_path, })
# --------------------------------------------------------------------------- # Tool 2: inspect_asset # ---------------------------------------------------------------------------
[docs] @app.tool() def inspect_asset(asset_id: str) -> str: """Get detailed metadata for any GEE asset (Image, ImageCollection, FeatureCollection, etc.). Returns band names/types, CRS, scale, dimensions, date range, size, column names, geometry type, and properties as appropriate. Args: asset_id: Full Earth Engine asset ID (e.g. "COPERNICUS/S2_SR_HARMONIZED"). Returns: JSON with asset metadata. """ _ensure_initialized() ee = _namespace["ee"] try: info = ee.data.getInfo(asset_id) except Exception as exc: return json.dumps({"error": str(exc), "asset_id": asset_id}) if info is None: return json.dumps({"error": f"Asset not found: {asset_id}", "asset_id": asset_id}) asset_type = info.get("type", "UNKNOWN") result: dict = {"asset_id": asset_id, "type": asset_type} try: if asset_type in ("IMAGE", "Image"): asset = ee.Image(asset_id) elif asset_type in ("IMAGE_COLLECTION", "ImageCollection"): asset = ee.ImageCollection(asset_id).limit(5) elif asset_type in ("TABLE", "FeatureCollection"): asset = ee.FeatureCollection(asset_id).limit(5) else: # Folder or other -- just return raw info asset = None except Exception as exc: asset = None result["detail_error"] = str(exc) if asset is not None: result["asset"] = asset.getInfo() return json.dumps(result)
# --------------------------------------------------------------------------- # Tool 3: get_api_reference # --------------------------------------------------------------------------- import inspect as _inspect
[docs] @app.tool() def get_api_reference(module: str, function_name: str = "") -> str: """Look up the signature and docstring of a geeViz function or module. Uses Python's inspect module -- always reflects the installed code. Args: module: Short module name. One of: geeView, getImagesLib, changeDetectionLib, gee2Pandas, assetManagerLib, taskManagerLib, foliumView, phEEnoViz, cloudStorageManagerLib. function_name: Optional function or class name within the module. If omitted, returns the module-level docstring. Returns: Signature and docstring text, or error message. """ _ensure_initialized() fq = _MODULE_MAP.get(module) if not fq: return json.dumps({ "error": f"Unknown module: {module!r}. Valid modules: {', '.join(sorted(_MODULE_MAP))}", }) try: mod = importlib.import_module(fq) except Exception as exc: return json.dumps({"error": f"Failed to import {fq}: {exc}"}) if not function_name: return json.dumps({ "module": module, "docstring": _inspect.getdoc(mod) or "(no module docstring)", }) obj = getattr(mod, function_name, None) if obj is None: return json.dumps({"error": f"{function_name!r} not found in {module}"}) # Handle classes: show class docstring + public method list if _inspect.isclass(obj): methods = [ m for m in dir(obj) if not m.startswith("_") and callable(getattr(obj, m, None)) ] return json.dumps({ "module": module, "name": function_name, "type": "class", "docstring": _inspect.getdoc(obj) or "(no docstring)", "public_methods": methods, }) # Function or callable try: sig = str(_inspect.signature(obj)) except (ValueError, TypeError): sig = "(signature unavailable)" return json.dumps({ "module": module, "name": function_name, "signature": f"{function_name}{sig}", "docstring": _inspect.getdoc(obj) or "(no docstring)", })
# --------------------------------------------------------------------------- # Tool 4: list_functions # ---------------------------------------------------------------------------
[docs] @app.tool() def list_functions(module: str, filter: str = "") -> str: """List all public functions and classes in a geeViz module with one-line descriptions. Args: module: Short module name (see get_api_reference for valid names). filter: Optional substring filter (case-insensitive) to narrow results. Useful for getImagesLib which has 100+ functions. Returns: JSON list of {name, type, description} objects. """ _ensure_initialized() fq = _MODULE_MAP.get(module) if not fq: return json.dumps({ "error": f"Unknown module: {module!r}. Valid modules: {', '.join(sorted(_MODULE_MAP))}", }) try: mod = importlib.import_module(fq) except Exception as exc: return json.dumps({"error": f"Failed to import {fq}: {exc}"}) entries = [] for name in sorted(dir(mod)): if name.startswith("_"): continue obj = getattr(mod, name, None) if obj is None: continue if not (callable(obj) or _inspect.isclass(obj)): continue if filter and filter.lower() not in name.lower(): continue doc = _inspect.getdoc(obj) or "" first_line = doc.split("\n")[0].strip() if doc else "(no description)" kind = "class" if _inspect.isclass(obj) else "function" entries.append({"name": name, "type": kind, "description": first_line}) # For geeView, also list mapper class methods if present if module == "geeView": mapper_cls = getattr(mod, "mapper", None) if mapper_cls and _inspect.isclass(mapper_cls): for mname in sorted(dir(mapper_cls)): if mname.startswith("_"): continue mobj = getattr(mapper_cls, mname, None) if not callable(mobj): continue if filter and filter.lower() not in mname.lower(): continue doc = _inspect.getdoc(mobj) or "" first_line = doc.split("\n")[0].strip() if doc else "(no description)" entries.append({ "name": f"mapper.{mname}", "type": "method", "description": first_line, }) return json.dumps({"module": module, "count": len(entries), "functions": entries})
# --------------------------------------------------------------------------- # Tool 5: get_example # ---------------------------------------------------------------------------
[docs] @app.tool() def get_example(example_name: str) -> str: """Read the source code of a geeViz example script. Args: example_name: Name of the example, with or without extension. Supports .py (returns source) and .ipynb (extracts code and markdown cells). Returns: The example source code, or an error listing available examples. """ # Normalize: strip extension if given base = example_name for ext in (".py", ".ipynb"): if base.endswith(ext): base = base[:-len(ext)] break # Try .py first, then .ipynb py_path = os.path.join(_EXAMPLES_DIR, base + ".py") nb_path = os.path.join(_EXAMPLES_DIR, base + ".ipynb") if os.path.isfile(py_path): with open(py_path, "r", encoding="utf-8") as f: source = f.read() return json.dumps({ "example": base + ".py", "type": "python", "source": source, }) if os.path.isfile(nb_path): try: with open(nb_path, "r", encoding="utf-8") as f: nb = json.load(f) cells = [] for cell in nb.get("cells", []): cell_type = cell.get("cell_type", "") source = "".join(cell.get("source", [])) if cell_type in ("code", "markdown") and source.strip(): cells.append({"cell_type": cell_type, "source": source}) return json.dumps({ "example": base + ".ipynb", "type": "notebook", "cells": cells, }) except Exception as exc: return json.dumps({"error": f"Failed to read notebook: {exc}"}) # Not found -- list available available = _list_example_files() return json.dumps({ "error": f"Example not found: {example_name!r}", "available_examples": available, })
def _list_example_files() -> list[str]: """Return sorted list of example filenames (.py and .ipynb).""" if not os.path.isdir(_EXAMPLES_DIR): return [] return sorted( f for f in os.listdir(_EXAMPLES_DIR) if (f.endswith(".py") or f.endswith(".ipynb")) and f != "__init__.py" ) # --------------------------------------------------------------------------- # Tool 6: list_examples # ---------------------------------------------------------------------------
[docs] @app.tool() def list_examples(filter: str = "") -> str: """List available geeViz example scripts with descriptions. Args: filter: Optional substring filter (case-insensitive). Returns: JSON list of {name, description} objects. """ files = _list_example_files() results = [] for fname in files: if filter and filter.lower() not in fname.lower(): continue fpath = os.path.join(_EXAMPLES_DIR, fname) desc = "" if fname.endswith(".py"): try: with open(fpath, "r", encoding="utf-8") as f: # Read first few lines looking for a docstring or comment lines = [] for _ in range(20): line = f.readline() if not line: break lines.append(line) text = "".join(lines) # Try to extract docstring try: tree = ast.parse(text) if tree.body and isinstance(tree.body[0], ast.Expr) and isinstance(tree.body[0].value, ast.Constant): desc = str(tree.body[0].value.value).split("\n")[0].strip() except SyntaxError: pass # Fall back to first comment if not desc: for line in lines: stripped = line.strip() if stripped.startswith("#") and len(stripped) > 2: desc = stripped.lstrip("#").strip() break except Exception: pass elif fname.endswith(".ipynb"): try: with open(fpath, "r", encoding="utf-8") as f: nb = json.load(f) for cell in nb.get("cells", []): if cell.get("cell_type") == "markdown": source = "".join(cell.get("source", [])).strip() if source: # First non-empty line, strip markdown headers desc = source.split("\n")[0].lstrip("#").strip() break except Exception: pass results.append({"name": fname, "description": desc or "(no description)"}) return json.dumps({"count": len(results), "examples": results})
# --------------------------------------------------------------------------- # Tool 7: list_assets # ---------------------------------------------------------------------------
[docs] @app.tool() def list_assets(folder: str) -> str: """List assets in a GEE folder or collection. Args: folder: Full asset path (e.g. "projects/my-project/assets/my-folder"). Returns: JSON list of {id, type, sizeBytes} for each asset (max 200). """ _ensure_initialized() ee = _namespace["ee"] try: result = ee.data.listAssets({"parent": folder}) except Exception as exc: return json.dumps({"error": str(exc), "folder": folder}) assets = result.get("assets", []) entries = [] for a in assets[:200]: entries.append({ "id": a.get("id") or a.get("name", ""), "type": a.get("type", "UNKNOWN"), "sizeBytes": a.get("sizeBytes"), }) out: dict = {"folder": folder, "count": len(entries), "assets": entries} if len(assets) > 200: out["note"] = f"Showing 200 of {len(assets)} assets. Narrow your query for the rest." return json.dumps(out)
# --------------------------------------------------------------------------- # Tool 8: track_tasks # ---------------------------------------------------------------------------
[docs] @app.tool() def track_tasks(name_filter: str = "") -> str: """Get status of recent Earth Engine tasks. Args: name_filter: Optional case-insensitive filter on task description. Returns: JSON list of recent tasks with description, state, type, start time, runtime, and error message (max 50). """ _ensure_initialized() ee = _namespace["ee"] try: tasks = ee.data.getTaskList() except Exception as exc: return json.dumps({"error": str(exc)}) entries = [] for t in tasks[:50]: desc = t.get("description", "") if name_filter and name_filter.lower() not in desc.lower(): continue entries.append({ "description": desc, "state": t.get("state", "UNKNOWN"), "task_type": t.get("task_type", ""), "start_timestamp_ms": t.get("start_timestamp_ms"), "update_timestamp_ms": t.get("update_timestamp_ms"), "error_message": t.get("error_message", ""), }) return json.dumps({"count": len(entries), "tasks": entries})
# --------------------------------------------------------------------------- # Tool 9: view_map # ---------------------------------------------------------------------------
[docs] @app.tool() def view_map(open_browser: bool = True) -> str: """Open the geeView interactive map and return the URL. Call this after adding layers with run_code. The Map object is the same singleton used by run_code, so all layers added there will appear. Args: open_browser: Whether to open the map in the default browser (default True). Returns: JSON with the map URL and layer count. """ _ensure_initialized() Map = _namespace["Map"] # Capture the URL that view() prints to stdout url_buf = io.StringIO() try: with contextlib.redirect_stdout(url_buf): Map.view(open_browser=open_browser, open_iframe=False) except Exception as exc: return json.dumps({"error": str(exc)}) # Extract URL from printed output printed = url_buf.getvalue() url = None for line in printed.splitlines(): line = line.strip() if line.startswith("http"): url = line break layer_count = len(Map.idDictList) if hasattr(Map, "idDictList") else 0 return json.dumps({ "url": url, "layer_count": layer_count, "message": f"Map opened with {layer_count} layer(s)." if url else "Map.view() ran but no URL was captured.", "raw_output": printed.strip(), })
# --------------------------------------------------------------------------- # Tool 10: get_map_layers # ---------------------------------------------------------------------------
[docs] @app.tool() def get_map_layers() -> str: """See what layers are currently on the map. Returns layer names, types, visibility, and visualization parameters. Useful for debugging why a map looks wrong or checking state. Returns: JSON with layers list and active map commands. """ _ensure_initialized() Map = _namespace["Map"] layers = [] for entry in getattr(Map, "idDictList", []): viz_raw = entry.get("viz", "{}") try: viz = json.loads(viz_raw) if isinstance(viz_raw, str) else viz_raw except (json.JSONDecodeError, TypeError): viz = viz_raw layers.append({ "name": entry.get("name", "(unnamed)"), "visible": entry.get("visible", "true"), "function": entry.get("function", ""), "viz": viz, }) commands = list(getattr(Map, "mapCommandList", [])) return json.dumps({ "layer_count": len(layers), "layers": layers, "commands": commands, })
# --------------------------------------------------------------------------- # Tool 11: clear_map # ---------------------------------------------------------------------------
[docs] @app.tool() def clear_map() -> str: """Clear all layers and commands from the map. Resets the Map to a blank state so you can start fresh. Returns: JSON confirmation. """ _ensure_initialized() Map = _namespace["Map"] try: Map.clearMap() except Exception as exc: return json.dumps({"error": str(exc)}) return json.dumps({ "success": True, "message": "Map cleared. All layers and commands removed.", })
# --------------------------------------------------------------------------- # Tool 12: search_functions # ---------------------------------------------------------------------------
[docs] @app.tool() def search_functions(query: str) -> str: """Search for a function across ALL geeViz modules by name or docstring. Unlike list_functions (which searches one module), this searches everything at once. Useful when you don't know which module a function is in. Args: query: Search term (case-insensitive). Matched against function names and the first line of their docstrings. Returns: JSON list of {module, name, type, description} matches. """ _ensure_initialized() q = query.lower() results = [] for short_name, fq_name in _MODULE_MAP.items(): try: mod = importlib.import_module(fq_name) except Exception: continue for name in dir(mod): if name.startswith("_"): continue obj = getattr(mod, name, None) if obj is None or not (callable(obj) or _inspect.isclass(obj)): continue doc = _inspect.getdoc(obj) or "" first_line = doc.split("\n")[0].strip() if doc else "" if q in name.lower() or q in first_line.lower(): kind = "class" if _inspect.isclass(obj) else "function" results.append({ "module": short_name, "name": name, "type": kind, "description": first_line or "(no description)", }) return json.dumps({"query": query, "count": len(results), "results": results})
# --------------------------------------------------------------------------- # Tool 13: save_script # ---------------------------------------------------------------------------
[docs] @app.tool() def save_script(filename: str = "") -> str: """Save the accumulated run_code history to a standalone .py file. The script includes all necessary imports and every successful run_code call in order, so it can be run independently. Args: filename: Optional custom filename (saved in geeViz/mcp/generated_scripts/). If omitted, uses the auto-generated session filename. Returns: JSON with the file path and number of code blocks saved. """ if not _code_history: return json.dumps({ "error": "No code has been executed yet. Use run_code first.", }) global _current_script_path if filename: if not filename.endswith(".py"): filename += ".py" os.makedirs(_script_dir, exist_ok=True) _current_script_path = os.path.join(_script_dir, filename) path = _save_history_to_file() return json.dumps({ "success": True, "script_path": path, "code_blocks": len(_code_history), "message": f"Saved {len(_code_history)} code block(s) to {path}", })
# --------------------------------------------------------------------------- # Tool 14: get_version_info # ---------------------------------------------------------------------------
[docs] @app.tool() def get_version_info() -> str: """Return version information for geeViz, Earth Engine, and Python. Useful for debugging environment issues or confirming which versions are active in the MCP server session. Returns: JSON with geeViz_version, ee_version, python_version, platform. """ import geeViz result = { "geeViz_version": geeViz.__version__, "python_version": sys.version, "platform": sys.platform, } try: import ee result["ee_version"] = ee.__version__ except Exception: result["ee_version"] = "(not available)" return json.dumps(result)
# --------------------------------------------------------------------------- # Tool 15: get_namespace # --------------------------------------------------------------------------- # Builtins pre-populated by _ensure_initialized -- excluded from get_namespace _NAMESPACE_BUILTINS = {"ee", "Map", "gv", "gil"}
[docs] @app.tool() def get_namespace() -> str: """Inspect user-defined variables in the persistent REPL namespace. Shows what variables exist after run_code calls. Excludes the built-in entries (ee, Map, gv, gil). For each variable, reports name, type, and a truncated repr. No getInfo() calls are made -- this is pure Python-side introspection. Returns: JSON with a list of {name, type, repr} objects. """ _ensure_initialized() ee = _namespace["ee"] entries = [] for name, obj in sorted(_namespace.items()): if name.startswith("_") or name in _NAMESPACE_BUILTINS: continue # Detect ee-specific types type_name = type(obj).__name__ if isinstance(obj, ee.Image): type_name = "ee.Image" elif isinstance(obj, ee.ImageCollection): type_name = "ee.ImageCollection" elif isinstance(obj, ee.FeatureCollection): type_name = "ee.FeatureCollection" elif isinstance(obj, ee.Feature): type_name = "ee.Feature" elif isinstance(obj, ee.Geometry): type_name = "ee.Geometry" elif isinstance(obj, ee.Number): type_name = "ee.Number" elif isinstance(obj, ee.String): type_name = "ee.String" elif isinstance(obj, ee.List): type_name = "ee.List" elif isinstance(obj, ee.Dictionary): type_name = "ee.Dictionary" elif isinstance(obj, ee.Filter): type_name = "ee.Filter" elif isinstance(obj, ee.Reducer): type_name = "ee.Reducer" elif isinstance(obj, ee.ComputedObject): type_name = "ee.ComputedObject" # Truncated repr (no getInfo) try: r = repr(obj) if len(r) > 200: r = r[:200] + "..." except Exception: r = "(repr failed)" entries.append({"name": name, "type": type_name, "repr": r}) return json.dumps({ "count": len(entries), "variables": entries, "note": "Excludes builtins (ee, Map, gv, gil). No getInfo() calls made.", })
# --------------------------------------------------------------------------- # Tool 16: get_project_info # ---------------------------------------------------------------------------
[docs] @app.tool() def get_project_info() -> str: """Return the current Earth Engine project ID and a sample of root assets. Useful for confirming which GEE project the session is using and seeing what top-level assets are available. Returns: JSON with project_id and a list of root assets. """ _ensure_initialized() ee = _namespace["ee"] result: dict = {} # Get project ID try: project = ee.data._get_cloud_api_user_project() result["project_id"] = project except Exception as exc: result["project_id"] = None result["project_error"] = str(exc) # List root assets if result.get("project_id"): try: root = f"projects/{result['project_id']}/assets" assets_response = ee.data.listAssets({"parent": root}) assets = assets_response.get("assets", []) result["root_assets"] = [ { "id": a.get("id") or a.get("name", ""), "type": a.get("type", "UNKNOWN"), } for a in assets[:50] ] result["root_asset_count"] = len(assets) except Exception as exc: result["root_assets"] = [] result["assets_error"] = str(exc) return json.dumps(result)
# --------------------------------------------------------------------------- # Tool 17: save_notebook # ---------------------------------------------------------------------------
[docs] @app.tool() def save_notebook(filename: str = "") -> str: """Save the accumulated run_code history as a Jupyter notebook (.ipynb). Creates one code cell per run_code call, with a markdown header cell and an import cell at the top. Uses nbformat 4.5 structure (plain JSON, no nbformat library required). Args: filename: Optional custom filename (saved in geeViz/mcp/generated_scripts/). If omitted, uses a timestamped default. Extension .ipynb is added automatically if missing. Returns: JSON with the file path and number of code cells saved. """ if not _code_history: return json.dumps({ "error": "No code has been executed yet. Use run_code first.", }) import datetime os.makedirs(_script_dir, exist_ok=True) if filename: if not filename.endswith(".ipynb"): filename += ".ipynb" nb_path = os.path.join(_script_dir, filename) else: ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") nb_path = os.path.join(_script_dir, f"session_{ts}.ipynb") # Build notebook structure (nbformat 4.5) cells = [] # Markdown header cell cells.append({ "cell_type": "markdown", "metadata": {}, "source": [ "# geeViz MCP Session\n", "\n", f"Auto-generated by geeViz MCP server on {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.\n", ], }) # Import cell cells.append({ "cell_type": "code", "metadata": {}, "source": [ "import geeViz.geeView as gv\n", "import geeViz.getImagesLib as gil\n", "ee = gv.ee\n", "Map = gv.Map", ], "execution_count": None, "outputs": [], }) # One code cell per run_code call for i, block in enumerate(_code_history): lines = block.splitlines(True) # keep line endings # Ensure last line has newline for notebook format if lines and not lines[-1].endswith("\n"): lines[-1] += "\n" cells.append({ "cell_type": "code", "metadata": {}, "source": lines, "execution_count": None, "outputs": [], }) notebook = { "nbformat": 4, "nbformat_minor": 5, "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3", }, "language_info": { "name": "python", "version": sys.version.split()[0], }, }, "cells": cells, } with open(nb_path, "w", encoding="utf-8") as f: json.dump(notebook, f, indent=1, ensure_ascii=False) return json.dumps({ "success": True, "notebook_path": nb_path, "code_cells": len(_code_history), "message": f"Saved {len(_code_history)} code cell(s) to {nb_path}", })
# --------------------------------------------------------------------------- # Tool 18: export_to_asset # ---------------------------------------------------------------------------
[docs] @app.tool() def export_to_asset( image_var: str, asset_id: str, region_var: str = "", scale: int = 30, crs: str = "EPSG:4326", overwrite: bool = False, pyramiding_policy: str = "mean", ) -> str: """Export an ee.Image from the REPL namespace to a GEE asset. Uses geeViz's exportToAssetWrapper which handles existing assets, pyramiding policy, and region clipping automatically. Args: image_var: Name of the ee.Image variable in the REPL namespace. asset_id: Full destination asset ID (e.g. "projects/my-project/assets/my_export"). region_var: Optional name of an ee.Geometry or ee.FeatureCollection variable to use as the export region. If omitted, the image's footprint is used. scale: Output resolution in meters (default 30). crs: Coordinate reference system (default "EPSG:4326"). overwrite: If True, overwrite an existing asset (default False). pyramiding_policy: Pyramiding policy for bands -- "mean", "mode", "min", "max", "median", or "sample" (default "mean"). Returns: JSON with export status or an error. """ _ensure_initialized() ee = _namespace["ee"] gil = _namespace["gil"] # Look up image image = _namespace.get(image_var) if image is None: return json.dumps({"error": f"Variable {image_var!r} not found in namespace."}) if not isinstance(image, ee.Image): return json.dumps({ "error": f"Variable {image_var!r} is {type(image).__name__}, not ee.Image.", }) # Look up region (optional) region = None if region_var: region = _namespace.get(region_var) if region is None: return json.dumps({"error": f"Variable {region_var!r} not found in namespace."}) if isinstance(region, ee.FeatureCollection): region = region.geometry() elif not isinstance(region, ee.Geometry): return json.dumps({ "error": f"Variable {region_var!r} is {type(region).__name__}, " "expected ee.Geometry or ee.FeatureCollection.", }) # Derive asset name from the asset path asset_name = asset_id.split("/")[-1] # Build pyramiding policy object pyramiding_obj = {"default": pyramiding_policy} # Call the geeViz wrapper (captures stdout since it prints status) stdout_buf = io.StringIO() try: with contextlib.redirect_stdout(stdout_buf): gil.exportToAssetWrapper( image, asset_name, asset_id, pyramidingPolicyObject=pyramiding_obj, roi=region, scale=scale, crs=crs, overwrite=overwrite, ) except Exception as exc: return json.dumps({ "error": f"Export failed: {exc}", "stdout": stdout_buf.getvalue(), }) return json.dumps({ "success": True, "asset_id": asset_id, "scale": scale, "crs": crs, "overwrite": overwrite, "pyramiding_policy": pyramiding_policy, "stdout": stdout_buf.getvalue().strip(), "message": "Export task started. Use track_tasks() to monitor progress.", })
# --------------------------------------------------------------------------- # Tool 19: geocode # --------------------------------------------------------------------------- import urllib.request import urllib.parse import urllib.error
[docs] @app.tool() def geocode(place_name: str, use_boundaries: bool = False) -> str: """Geocode a place name to coordinates and optionally find GEE boundary polygons. Uses OpenStreetMap Nominatim for point/bounding-box geocoding (no API key needed). When use_boundaries=True, also searches GEE boundary collections (WDPA, FAO/GAUL, TIGER States/Counties) for a matching polygon and returns the asset ID and filter expression for use in run_code. Args: place_name: Place name to geocode (e.g. "Yellowstone National Park", "Montana", "Bozeman, MT"). use_boundaries: If True, also search GEE boundary FeatureCollections for a matching polygon. Default False. Returns: JSON with coordinates, bounding box, ee code snippets, and optionally matching GEE boundary info. """ # --- Nominatim geocoding (stdlib only) --- params = urllib.parse.urlencode({ "q": place_name, "format": "json", "limit": "1", "addressdetails": "1", }) url = f"https://nominatim.openstreetmap.org/search?{params}" req = urllib.request.Request(url, headers={"User-Agent": "geeViz-MCP/1.0"}) try: with urllib.request.urlopen(req, timeout=10) as resp: data = json.loads(resp.read().decode("utf-8")) except Exception as exc: return json.dumps({"error": f"Nominatim request failed: {exc}"}) if not data: return json.dumps({"error": f"No results found for {place_name!r}."}) hit = data[0] lat = float(hit["lat"]) lon = float(hit["lon"]) display_name = hit.get("display_name", place_name) osm_type = hit.get("type", "") bbox = hit.get("boundingbox", []) # [south, north, west, east] as strings result: dict = { "place_name": place_name, "display_name": display_name, "latitude": lat, "longitude": lon, "osm_type": osm_type, "ee_point_code": f"ee.Geometry.Point([{lon}, {lat}])", } if bbox and len(bbox) == 4: south, north, west, east = [float(x) for x in bbox] result["bbox"] = {"south": south, "north": north, "west": west, "east": east} result["ee_bbox_code"] = ( f"ee.Geometry.Rectangle([{west}, {south}, {east}, {north}])" ) # --- Optional GEE boundary search --- if use_boundaries: _ensure_initialized() ee = _namespace["ee"] # Boundary collections to search: (asset_id, name_property, description) boundary_sources = [ ("WCMC/WDPA/current/polygons", "NAME", "WDPA Protected Areas"), ("FAO/GAUL/2015/level0", "ADM0_NAME", "GAUL Countries"), ("FAO/GAUL/2015/level1", "ADM1_NAME", "GAUL Admin Level 1"), ("FAO/GAUL/2015/level2", "ADM2_NAME", "GAUL Admin Level 2"), ("TIGER/2018/States", "NAME", "US States"), ("TIGER/2018/Counties", "NAME", "US Counties"), ] matches = [] search_term = place_name.strip() for asset_id, name_prop, source_desc in boundary_sources: try: fc = ee.FeatureCollection(asset_id) # Case-insensitive search using ee.Filter filtered = fc.filter(ee.Filter.eq(name_prop, search_term)) count = filtered.size().getInfo() if count > 0: # Get the name of the first match for confirmation first_name = filtered.first().get(name_prop).getInfo() matches.append({ "source": source_desc, "asset_id": asset_id, "filter_property": name_prop, "filter_value": first_name, "feature_count": count, "ee_code": ( f"ee.FeatureCollection('{asset_id}')" f".filter(ee.Filter.eq('{name_prop}', '{first_name}'))" ), }) except Exception: continue # Skip collections that error (e.g. access issues) result["boundary_matches"] = matches if not matches: result["boundary_note"] = ( "No exact boundary match found. Try a different spelling, " "or use the ee_point_code / ee_bbox_code above with .buffer()." ) return json.dumps(result)
# --------------------------------------------------------------------------- # Dataset catalog cache helpers # --------------------------------------------------------------------------- import time as _time _CACHE_DIR = os.path.join(_THIS_DIR, "dataset_cache") _CACHE_TTL = 86400 # 24 hours in seconds _CACHE_META_FILE = os.path.join(_CACHE_DIR, "cache_meta.json") _cache_lock = threading.Lock() _CATALOG_URLS = { "official": "https://raw.githubusercontent.com/samapriya/Earth-Engine-Datasets-List/master/gee_catalog.json", "community": "https://raw.githubusercontent.com/samapriya/awesome-gee-community-datasets/master/community_datasets.json", } _CATALOG_FILES = { "official": "gee_catalog.json", "community": "community_datasets.json", } def _read_cache_meta() -> dict: """Read the cache timestamp metadata file.""" if os.path.isfile(_CACHE_META_FILE): try: with open(_CACHE_META_FILE, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass return {} def _write_cache_meta(meta: dict) -> None: """Write the cache timestamp metadata file.""" os.makedirs(_CACHE_DIR, exist_ok=True) with open(_CACHE_META_FILE, "w", encoding="utf-8") as f: json.dump(meta, f) def _get_cached_catalog(name: str) -> list[dict] | None: """Return parsed JSON list for a catalog, fetching/caching as needed. Args: name: "official" or "community" Returns: List of dataset dicts, or None if unavailable. """ with _cache_lock: cache_file = os.path.join(_CACHE_DIR, _CATALOG_FILES[name]) meta = _read_cache_meta() ts_key = f"{name}_ts" now = _time.time() # Check if cache is fresh cached_exists = os.path.isfile(cache_file) cache_fresh = cached_exists and (now - meta.get(ts_key, 0)) < _CACHE_TTL if cache_fresh: try: with open(cache_file, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass # Fall through to fetch # Fetch from remote url = _CATALOG_URLS[name] try: req = urllib.request.Request(url, headers={"User-Agent": "geeViz-MCP/1.0"}) with urllib.request.urlopen(req, timeout=15) as resp: raw = resp.read().decode("utf-8") data = json.loads(raw) # Cache the result os.makedirs(_CACHE_DIR, exist_ok=True) with open(cache_file, "w", encoding="utf-8") as f: f.write(raw) meta[ts_key] = now _write_cache_meta(meta) return data except Exception: # Fetch failed -- use stale cache if available if cached_exists: try: with open(cache_file, "r", encoding="utf-8") as f: return json.load(f) except Exception: pass return None # --------------------------------------------------------------------------- # Tool 20: search_datasets # ---------------------------------------------------------------------------
[docs] @app.tool() def search_datasets(query: str, source: str = "all", max_results: int = 10) -> str: """Search the GEE dataset catalog by keyword. Searches both the official Earth Engine catalog (~500+ datasets) and the community catalog (~200+ datasets). Uses word-level matching against title, tags, id, and provider fields with relevance scoring. Args: query: Search terms (e.g. "landsat surface reflectance", "DEM", "sentinel fire"). Case-insensitive. source: Which catalog to search: "official", "community", or "all" (default). max_results: Maximum number of results to return (default 10). Returns: JSON list of matching datasets with id, title, type, provider, tags, source, and additional metadata. """ if source not in ("official", "community", "all"): return json.dumps({ "error": f"Invalid source: {source!r}. Must be 'official', 'community', or 'all'.", }) sources_to_search = ( ["official", "community"] if source == "all" else [source] ) # Load catalogs catalogs: dict[str, list[dict]] = {} errors: list[str] = [] for src in sources_to_search: data = _get_cached_catalog(src) if data is not None: catalogs[src] = data else: errors.append(f"Failed to load {src} catalog (no cache available).") if not catalogs: return json.dumps({"error": " ".join(errors)}) # Split query into words for multi-word matching query_words = query.lower().split() if not query_words: return json.dumps({"error": "Empty query."}) # Field weights weights = {"title": 3, "tags": 2, "id": 2, "provider": 1} scored: list[tuple[int, dict]] = [] for src_name, entries in catalogs.items(): for entry in entries: # Extract searchable fields title = (entry.get("title") or "").lower() tags = (entry.get("tags") or "").lower() eid = (entry.get("id") or "").lower() provider = (entry.get("provider") or "").lower() fields = {"title": title, "tags": tags, "id": eid, "provider": provider} # Score: sum of (weight Ă— number of query words matched in field) score = 0 for field_name, field_val in fields.items(): for word in query_words: if word in field_val: score += weights[field_name] if score == 0: continue # Build result entry result_entry: dict = { "id": entry.get("id", ""), "title": entry.get("title", ""), "type": entry.get("type", ""), "provider": entry.get("provider", ""), "tags": entry.get("tags", ""), "source": src_name, } if src_name == "official": result_entry["date_range"] = entry.get("date_range", "") # Build STAC URL eid_raw = entry.get("id", "") if eid_raw: parts = eid_raw.split("/") stac_dir = parts[0] stac_file = eid_raw.replace("/", "_") result_entry["stac_url"] = ( f"https://earthengine-stac.storage.googleapis.com/" f"catalog/{stac_dir}/{stac_file}.json" ) else: # Community catalog fields result_entry["thematic_group"] = entry.get("thematic_group", "") result_entry["docs"] = entry.get("docs", "") scored.append((score, result_entry)) # Sort by score descending, then by title alphabetically scored.sort(key=lambda x: (-x[0], x[1].get("title", ""))) results = [entry for _, entry in scored[:max_results]] out: dict = { "query": query, "source": source, "count": len(results), "total_matches": len(scored), "results": results, } if errors: out["warnings"] = errors return json.dumps(out)
# --------------------------------------------------------------------------- # Tool 21: get_dataset_info # ---------------------------------------------------------------------------
[docs] @app.tool() def get_dataset_info(dataset_id: str) -> str: """Get detailed STAC metadata for a GEE dataset. Fetches the full STAC JSON record from earthengine-stac.storage.googleapis.com and returns it as-is. The record includes bands (with classes, wavelengths, scale/offset), description, temporal/spatial extent, keywords, license, visualization parameters, provider info, and links. This is the "drill down" companion to search_datasets -- use search_datasets to find datasets, then get_dataset_info for full details. Only works for official GEE datasets (STAC records don't exist for community datasets). For community datasets, use inspect_asset instead. Args: dataset_id: Full GEE dataset ID (e.g. "LANDSAT/LC09/C02/T1_L2"). Returns: The full STAC JSON record for the dataset, or an error message. """ # Build STAC URL: first segment is directory, full ID with / -> _ is filename parts = dataset_id.split("/") if not parts: return json.dumps({"error": "Empty dataset_id."}) stac_dir = parts[0] stac_file = dataset_id.replace("/", "_") stac_url = ( f"https://earthengine-stac.storage.googleapis.com/" f"catalog/{stac_dir}/{stac_file}.json" ) try: req = urllib.request.Request(stac_url, headers={"User-Agent": "geeViz-MCP/1.0"}) with urllib.request.urlopen(req, timeout=15) as resp: stac = json.loads(resp.read().decode("utf-8")) except urllib.error.HTTPError as exc: if exc.code == 404: return json.dumps({ "error": f"No STAC record found for {dataset_id!r}. " "This may be a community dataset -- try inspect_asset instead.", "dataset_id": dataset_id, "stac_url": stac_url, }) return json.dumps({ "error": f"HTTP {exc.code} fetching STAC record: {exc.reason}", "stac_url": stac_url, }) except Exception as exc: return json.dumps({"error": f"Failed to fetch STAC record: {exc}", "stac_url": stac_url}) # Return the full STAC record as-is return json.dumps(stac)
# --------------------------------------------------------------------------- # Tool 22: get_thumbnail # --------------------------------------------------------------------------- import base64 as _base64
[docs] @app.tool() def get_thumbnail( variable: str, viz_params: str = "{}", dimensions: int = 512, region_var: str = "", frames_per_second: int = 3, ): """Get a satellite imagery thumbnail from Earth Engine for visual inspection. Returns the image directly so you (the AI) can see and describe what is on the ground. These are satellite images captured from space -- here is how to interpret common band combinations: - **True color (e.g. B4,B3,B2 or B_R,B_G,B_B):** Shows the Earth as the human eye would see it. Green = vegetation, brown = bare soil, white = clouds/snow, blue/dark = water, grey = urban/rock. - **False color (e.g. B5,B4,B3 or NIR,Red,Green):** Vegetation appears bright red/magenta, bare ground is tan/brown, water is dark blue/black, urban areas are cyan/grey, burned areas are dark brown/black. - **Single band with palette (e.g. NDVI, elevation):** Color ramp maps values to colors -- check the palette and min/max to interpret. When describing what you see, mention: dominant land cover types, any visible change patterns (for animations), cloud cover if present, and spatial patterns (e.g. river corridors, urban grids, agricultural fields). For ee.Image, returns a PNG thumbnail. For ee.ImageCollection, returns an animated GIF via getVideoThumbURL (useful for showing change over time). IMPORTANT for ImageCollection GIFs: You MUST provide viz_params with exactly 3 bands (or 1 band + palette) to produce a valid RGB animation. The EE API requires RGB/RGBA visualization for video thumbnails. Args: variable: Name of an ee.Image or ee.ImageCollection variable in the REPL namespace (set via run_code). viz_params: JSON string of visualization parameters. Common keys: bands (list of 3 for RGB, or 1 + palette), min, max, palette, gamma. Example: '{"bands": ["B4","B3","B2"], "min": 0, "max": 3000}' ALWAYS provide bands + min/max -- without them, EE uses unhelpful defaults and GIFs will fail. dimensions: Thumbnail width in pixels (default 512). region_var: Optional name of an ee.Geometry or ee.FeatureCollection variable to use as the thumbnail region. frames_per_second: Animation speed for ImageCollection GIFs (default 3). Ignored for single images. Returns: A list containing the image (PNG/GIF) and a text message with the thumbnail URL. IMPORTANT: Always share the thumbnail URL with the user so they can open it in a browser -- especially for animated GIFs, which may not play inline in chat UIs. """ _ensure_initialized() ee = _namespace["ee"] # Look up variable obj = _namespace.get(variable) if obj is None: return json.dumps({ "error": f"Variable {variable!r} not found in namespace. " "Use run_code to create it first.", }) # Parse viz params try: params = json.loads(viz_params) if isinstance(viz_params, str) else viz_params except (json.JSONDecodeError, TypeError) as exc: return json.dumps({"error": f"Invalid viz_params JSON: {exc}"}) if not isinstance(params, dict): return json.dumps({"error": "viz_params must be a JSON object (dict)."}) params["dimensions"] = dimensions # Handle region if region_var: region = _namespace.get(region_var) if region is None: return json.dumps({"error": f"Region variable {region_var!r} not found in namespace."}) if isinstance(region, ee.FeatureCollection): region = region.geometry() if isinstance(region, ee.Geometry): params["region"] = region else: return json.dumps({ "error": f"Variable {region_var!r} is {type(region).__name__}, " "expected ee.Geometry or ee.FeatureCollection.", }) # Generate thumbnail URL is_collection = False try: if isinstance(obj, ee.Image): params["format"] = "png" thumb_url = obj.getThumbURL(params) elif isinstance(obj, ee.ImageCollection): is_collection = True # Validate: GIF requires RGB visualization (3 bands or 1 band + palette) bands = params.get("bands", []) palette = params.get("palette") if not bands and not palette: return json.dumps({ "error": "ImageCollection GIFs require viz_params with 'bands' " "(3 bands for RGB, or 1 band + 'palette'). " "Example: '{\"bands\": [\"B4\",\"B3\",\"B2\"], \"min\": 0, \"max\": 3000}'", }) count = obj.size().getInfo() if count == 0: return json.dumps({"error": "ImageCollection is empty -- no images to animate."}) if count > 40: obj = obj.limit(40) # getVideoThumbURL always returns GIF; format param is optional # but framesPerSecond is needed for animation timing params["framesPerSecond"] = frames_per_second thumb_url = obj.getVideoThumbURL(params) else: return json.dumps({ "error": f"Variable {variable!r} is {type(obj).__name__}, " "expected ee.Image or ee.ImageCollection.", }) except Exception as exc: return json.dumps({"error": f"Failed to generate thumbnail: {exc}"}) # Download the image img_format = "gif" if is_collection else "png" try: req = urllib.request.Request(thumb_url, headers={"User-Agent": "geeViz-MCP/1.0"}) with urllib.request.urlopen(req, timeout=120) as resp: img_bytes = resp.read() except Exception as exc: return json.dumps({ "error": f"Failed to download thumbnail: {exc}", "thumb_url": thumb_url, }) # Validate the response is the expected format if is_collection: # GIF magic bytes: GIF87a or GIF89a if len(img_bytes) < 6 or img_bytes[:3] != b"GIF": return json.dumps({ "error": "Earth Engine did not return a valid GIF. " "Ensure viz_params has exactly 3 bands (RGB) or " "1 band + palette, and that the collection is not empty.", "thumb_url": thumb_url, "response_size": len(img_bytes), "response_start": img_bytes[:20].hex() if img_bytes else "", }) else: # PNG magic bytes: 89 50 4E 47 if len(img_bytes) < 4 or img_bytes[:4] != b"\x89PNG": return json.dumps({ "error": "Earth Engine did not return a valid PNG.", "thumb_url": thumb_url, "response_size": len(img_bytes), }) # Build a text message with the URL -- always included so the LLM can # share it with the user (GIFs won't animate inline in most chat UIs). if is_collection: url_text = ( f"Animated GIF thumbnail URL (share with user -- GIFs may not " f"animate inline):\n{thumb_url}" ) else: url_text = f"Thumbnail URL:\n{thumb_url}" # Return as [Image, text] so the LLM sees the image AND gets the URL. # FastMCP's _convert_to_content flattens lists: Image → ImageContent, # str → TextContent. if _MCPImage is not None: try: return [_MCPImage(data=img_bytes, format=img_format), url_text] except Exception: pass # Fallback: base64-encoded image in JSON (when MCP Image type unavailable) return json.dumps({ "image_base64": _base64.b64encode(img_bytes).decode("ascii"), "mime_type": f"image/{img_format}", "image_url": thumb_url, "note": f"Satellite image returned as base64 {img_format.upper()}. " "MCP Image type was not available. SHARE THE image_url WITH " "THE USER so they can view it in a browser.", })
# --------------------------------------------------------------------------- # Tool 23: export_to_drive # ---------------------------------------------------------------------------
[docs] @app.tool() def export_to_drive( image_var: str, output_name: str, drive_folder: str, region_var: str, scale: int = 30, crs: str = "EPSG:4326", output_no_data: int = -32768, ) -> str: """Export an ee.Image from the REPL namespace to Google Drive. Uses geeViz's exportToDriveWrapper. A region variable is required for Drive exports. Args: image_var: Name of the ee.Image variable in the REPL namespace. output_name: Output filename (without extension). drive_folder: Google Drive folder name to export into. region_var: Name of an ee.Geometry or ee.FeatureCollection variable to use as the export region. Required for Drive exports. scale: Output resolution in meters (default 30). crs: Coordinate reference system (default "EPSG:4326"). output_no_data: NoData value for the output (default -32768). Returns: JSON with export status or an error. """ _ensure_initialized() ee = _namespace["ee"] gil = _namespace["gil"] # Look up image image = _namespace.get(image_var) if image is None: return json.dumps({"error": f"Variable {image_var!r} not found in namespace."}) if not isinstance(image, ee.Image): return json.dumps({ "error": f"Variable {image_var!r} is {type(image).__name__}, not ee.Image.", }) # Look up region (required) region = _namespace.get(region_var) if region is None: return json.dumps({"error": f"Variable {region_var!r} not found in namespace."}) if isinstance(region, ee.FeatureCollection): region = region.geometry() elif not isinstance(region, ee.Geometry): return json.dumps({ "error": f"Variable {region_var!r} is {type(region).__name__}, " "expected ee.Geometry or ee.FeatureCollection.", }) stdout_buf = io.StringIO() try: with contextlib.redirect_stdout(stdout_buf): gil.exportToDriveWrapper( image, output_name, drive_folder, region, scale, crs, None, output_no_data, ) except Exception as exc: return json.dumps({ "error": f"Export to Drive failed: {exc}", "stdout": stdout_buf.getvalue(), }) return json.dumps({ "success": True, "output_name": output_name, "drive_folder": drive_folder, "scale": scale, "crs": crs, "stdout": stdout_buf.getvalue().strip(), "message": "Export to Drive task started. Use track_tasks() to monitor progress.", })
# --------------------------------------------------------------------------- # Tool 24: export_to_cloud_storage # ---------------------------------------------------------------------------
[docs] @app.tool() def export_to_cloud_storage( image_var: str, output_name: str, bucket: str, region_var: str, scale: int = 30, crs: str = "EPSG:4326", output_no_data: int = -32768, file_format: str = "GeoTIFF", overwrite: bool = False, ) -> str: """Export an ee.Image from the REPL namespace to Google Cloud Storage. Uses geeViz's exportToCloudStorageWrapper with Cloud Optimized GeoTIFF format options by default. Args: image_var: Name of the ee.Image variable in the REPL namespace. output_name: Output filename (without extension). bucket: Google Cloud Storage bucket name. region_var: Name of an ee.Geometry or ee.FeatureCollection variable to use as the export region. Required for GCS exports. scale: Output resolution in meters (default 30). crs: Coordinate reference system (default "EPSG:4326"). output_no_data: NoData value for the output (default -32768). file_format: Output format -- "GeoTIFF" (default) or "TFRecord". overwrite: If True, overwrite existing files (default False). Returns: JSON with export status or an error. """ _ensure_initialized() ee = _namespace["ee"] gil = _namespace["gil"] # Look up image image = _namespace.get(image_var) if image is None: return json.dumps({"error": f"Variable {image_var!r} not found in namespace."}) if not isinstance(image, ee.Image): return json.dumps({ "error": f"Variable {image_var!r} is {type(image).__name__}, not ee.Image.", }) # Look up region (required) region = _namespace.get(region_var) if region is None: return json.dumps({"error": f"Variable {region_var!r} not found in namespace."}) if isinstance(region, ee.FeatureCollection): region = region.geometry() elif not isinstance(region, ee.Geometry): return json.dumps({ "error": f"Variable {region_var!r} is {type(region).__name__}, " "expected ee.Geometry or ee.FeatureCollection.", }) stdout_buf = io.StringIO() try: with contextlib.redirect_stdout(stdout_buf): gil.exportToCloudStorageWrapper( image, output_name, bucket, region, scale, crs, None, output_no_data, file_format, {"cloudOptimized": True}, overwrite, ) except Exception as exc: return json.dumps({ "error": f"Export to Cloud Storage failed: {exc}", "stdout": stdout_buf.getvalue(), }) return json.dumps({ "success": True, "output_name": output_name, "bucket": bucket, "scale": scale, "crs": crs, "file_format": file_format, "overwrite": overwrite, "stdout": stdout_buf.getvalue().strip(), "message": "Export to Cloud Storage task started. Use track_tasks() to monitor progress.", })
# --------------------------------------------------------------------------- # Tool 25: cancel_tasks # ---------------------------------------------------------------------------
[docs] @app.tool() def cancel_tasks(name_filter: str = "") -> str: """Cancel running and ready Earth Engine tasks. If name_filter is provided, cancels only tasks whose description contains the filter string. Otherwise cancels ALL ready/running tasks. Uses geeViz's taskManagerLib for the actual cancellation. Args: name_filter: Optional substring filter. Only tasks whose description contains this string will be cancelled. If empty, all ready/running tasks are cancelled. Returns: JSON with task counts and cancellation status. """ _ensure_initialized() import geeViz.taskManagerLib as tml # Get current task state before cancellation task_state = tml.getTasks() ready_count = len(task_state.get("ready", [])) running_count = len(task_state.get("running", [])) stdout_buf = io.StringIO() try: with contextlib.redirect_stdout(stdout_buf): if name_filter: tml.cancelByName(name_filter) else: tml.batchCancel() except Exception as exc: return json.dumps({ "error": f"Cancel failed: {exc}", "stdout": stdout_buf.getvalue(), }) return json.dumps({ "success": True, "name_filter": name_filter or "(all)", "ready_before": ready_count, "running_before": running_count, "stdout": stdout_buf.getvalue().strip(), "message": "Task cancellation completed.", })
# --------------------------------------------------------------------------- # Tool 26: sample_values # ---------------------------------------------------------------------------
[docs] @app.tool() def sample_values( image_var: str, geometry_var: str = "", lon: float = None, lat: float = None, scale: int = 30, reducer: str = "first", ) -> str: """Sample pixel values from an ee.Image at a point or region. Provide either lon/lat coordinates for a point sample, or a geometry variable name for a region reduction. Args: image_var: Name of the ee.Image variable in the REPL namespace. geometry_var: Optional name of an ee.Geometry or ee.FeatureCollection variable. Used if lon/lat are not provided. lon: Longitude for a point sample. lat: Latitude for a point sample. scale: Scale in meters for the reduction (default 30). reducer: Reducer to use -- "first", "mean", "median", "min", "max", "sum", "stdDev", or "count" (default "first"). Returns: JSON dict of band name -> sampled value. """ _ensure_initialized() ee = _namespace["ee"] # Look up image image = _namespace.get(image_var) if image is None: return json.dumps({"error": f"Variable {image_var!r} not found in namespace."}) if not isinstance(image, ee.Image): return json.dumps({ "error": f"Variable {image_var!r} is {type(image).__name__}, not ee.Image.", }) # Determine geometry if lon is not None and lat is not None: geometry = ee.Geometry.Point([lon, lat]) elif geometry_var: geometry = _namespace.get(geometry_var) if geometry is None: return json.dumps({"error": f"Variable {geometry_var!r} not found in namespace."}) if isinstance(geometry, ee.FeatureCollection): geometry = geometry.geometry() elif not isinstance(geometry, ee.Geometry): return json.dumps({ "error": f"Variable {geometry_var!r} is {type(geometry).__name__}, " "expected ee.Geometry or ee.FeatureCollection.", }) else: return json.dumps({ "error": "Provide either lon/lat coordinates or a geometry_var name.", }) # Map reducer string to ee.Reducer reducer_map = { "first": ee.Reducer.first(), "mean": ee.Reducer.mean(), "median": ee.Reducer.median(), "min": ee.Reducer.min(), "max": ee.Reducer.max(), "sum": ee.Reducer.sum(), "stdDev": ee.Reducer.stdDev(), "count": ee.Reducer.count(), } ee_reducer = reducer_map.get(reducer) if ee_reducer is None: return json.dumps({ "error": f"Unknown reducer: {reducer!r}. " f"Valid: {', '.join(sorted(reducer_map))}", }) try: result = image.reduceRegion( reducer=ee_reducer, geometry=geometry, scale=scale, maxPixels=1e9, ).getInfo() except Exception as exc: return json.dumps({"error": f"Sample failed: {exc}"}) return json.dumps({ "success": True, "reducer": reducer, "scale": scale, "values": result, })
# --------------------------------------------------------------------------- # Tool 27: get_time_series # --------------------------------------------------------------------------- # Check for matplotlib at module level try: import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt _HAS_MPL = True except ImportError: _HAS_MPL = False
[docs] @app.tool() def get_time_series( collection_var: str, geometry_var: str = "", lon: float = None, lat: float = None, band: str = "", start_date: str = "", end_date: str = "", scale: int = 30, reducer: str = "mean", ): """Extract time series of band values from an ImageCollection. Reduces each image in the collection over the given geometry and returns date-value pairs. If matplotlib is available, also returns a line chart PNG. Args: collection_var: Name of the ee.ImageCollection variable in the REPL namespace. geometry_var: Optional name of an ee.Geometry or ee.FeatureCollection variable. lon: Longitude for a point sample (used if geometry_var is empty). lat: Latitude for a point sample (used if geometry_var is empty). band: Band name to extract. If empty, uses the first band. start_date: Optional start date filter (YYYY-MM-DD). end_date: Optional end date filter (YYYY-MM-DD). scale: Scale in meters for the reduction (default 30). reducer: Reducer to use -- "mean", "median", "min", "max", "first", "sum" (default "mean"). Returns: JSON with date-value data (and a PNG chart image if matplotlib is available). """ _ensure_initialized() ee = _namespace["ee"] # Look up collection collection = _namespace.get(collection_var) if collection is None: return json.dumps({"error": f"Variable {collection_var!r} not found in namespace."}) if not isinstance(collection, ee.ImageCollection): return json.dumps({ "error": f"Variable {collection_var!r} is {type(collection).__name__}, " "not ee.ImageCollection.", }) # Determine geometry if lon is not None and lat is not None: geometry = ee.Geometry.Point([lon, lat]) elif geometry_var: geometry = _namespace.get(geometry_var) if geometry is None: return json.dumps({"error": f"Variable {geometry_var!r} not found in namespace."}) if isinstance(geometry, ee.FeatureCollection): geometry = geometry.geometry() elif not isinstance(geometry, ee.Geometry): return json.dumps({ "error": f"Variable {geometry_var!r} is {type(geometry).__name__}, " "expected ee.Geometry or ee.FeatureCollection.", }) else: return json.dumps({ "error": "Provide either lon/lat coordinates or a geometry_var name.", }) # Apply date filters if start_date: collection = collection.filterDate(start_date, end_date or "2099-01-01") elif end_date: collection = collection.filterDate("1970-01-01", end_date) # Map reducer string reducer_map = { "first": ee.Reducer.first(), "mean": ee.Reducer.mean(), "median": ee.Reducer.median(), "min": ee.Reducer.min(), "max": ee.Reducer.max(), "sum": ee.Reducer.sum(), } ee_reducer = reducer_map.get(reducer) if ee_reducer is None: return json.dumps({ "error": f"Unknown reducer: {reducer!r}. " f"Valid: {', '.join(sorted(reducer_map))}", }) # Determine band name if not band: try: band = collection.first().bandNames().get(0).getInfo() except Exception as exc: return json.dumps({"error": f"Could not determine band name: {exc}"}) # Select the band and map reduceRegion over each image collection = collection.select(band) def _extract(img): date = img.date().format("YYYY-MM-dd") val = img.reduceRegion( reducer=ee_reducer, geometry=geometry, scale=scale, maxPixels=1e9, ).get(band) return ee.Feature(None, {"date": date, "value": val}) try: fc = collection.map(_extract) data = fc.getInfo() except Exception as exc: return json.dumps({"error": f"Time series extraction failed: {exc}"}) # Parse features into date-value list series = [] for feat in data.get("features", []): props = feat.get("properties", {}) series.append({ "date": props.get("date"), "value": props.get("value"), }) # Sort by date series.sort(key=lambda x: x.get("date") or "") # Filter out null values for charting valid_series = [p for p in series if p["value"] is not None and p["date"] is not None] # Generate chart if matplotlib is available if _HAS_MPL and valid_series and _MCPImage is not None: try: import datetime as _dt dates = [_dt.datetime.strptime(p["date"], "%Y-%m-%d") for p in valid_series] values = [p["value"] for p in valid_series] fig, ax = plt.subplots(figsize=(10, 4)) ax.plot(dates, values, marker=".", markersize=3, linewidth=0.8) ax.set_xlabel("Date") ax.set_ylabel(band) ax.set_title(f"Time Series: {band}") fig.autofmt_xdate() fig.tight_layout() buf = io.BytesIO() plt.savefig(buf, format="png", bbox_inches="tight", dpi=100) plt.close(fig) buf.seek(0) return _MCPImage(data=buf.getvalue(), format="png") except Exception: pass # Fall through to JSON-only response return json.dumps({ "success": True, "band": band, "reducer": reducer, "scale": scale, "count": len(series), "series": series, })
# --------------------------------------------------------------------------- # Tool 28: delete_asset # ---------------------------------------------------------------------------
[docs] @app.tool() def delete_asset(asset_id: str) -> str: """Delete a single GEE asset. Checks that the asset exists before attempting deletion. Only deletes a single asset -- will not recursively delete folders. Args: asset_id: Full asset path (e.g. "projects/my-project/assets/my_image"). Returns: JSON confirmation or error. """ _ensure_initialized() ee = _namespace["ee"] import geeViz.assetManagerLib as aml if not aml.ee_asset_exists(asset_id): return json.dumps({"error": f"Asset not found: {asset_id}"}) try: ee.data.deleteAsset(asset_id) except Exception as exc: return json.dumps({"error": f"Delete failed: {exc}"}) return json.dumps({ "success": True, "asset_id": asset_id, "message": f"Asset {asset_id} deleted.", })
# --------------------------------------------------------------------------- # Tool 29: copy_asset # ---------------------------------------------------------------------------
[docs] @app.tool() def copy_asset(source_id: str, dest_id: str, overwrite: bool = False) -> str: """Copy a GEE asset to a new location. Args: source_id: Full path of the source asset. dest_id: Full path for the destination asset. overwrite: If True and the destination exists, delete it first (default False). Returns: JSON confirmation or error. """ _ensure_initialized() ee = _namespace["ee"] import geeViz.assetManagerLib as aml if not aml.ee_asset_exists(source_id): return json.dumps({"error": f"Source asset not found: {source_id}"}) if aml.ee_asset_exists(dest_id): if overwrite: try: ee.data.deleteAsset(dest_id) except Exception as exc: return json.dumps({"error": f"Failed to delete existing dest: {exc}"}) else: return json.dumps({ "error": f"Destination already exists: {dest_id}. " "Set overwrite=True to replace it.", }) try: ee.data.copyAsset(source_id, dest_id) except Exception as exc: return json.dumps({"error": f"Copy failed: {exc}"}) return json.dumps({ "success": True, "source_id": source_id, "dest_id": dest_id, "message": f"Asset copied from {source_id} to {dest_id}.", })
# --------------------------------------------------------------------------- # Tool 30: move_asset # ---------------------------------------------------------------------------
[docs] @app.tool() def move_asset(source_id: str, dest_id: str, overwrite: bool = False) -> str: """Move a GEE asset (copy to destination, then delete source). The copy is performed first; the source is only deleted after a successful copy. Args: source_id: Full path of the source asset. dest_id: Full path for the destination asset. overwrite: If True and the destination exists, delete it first (default False). Returns: JSON confirmation or error. """ _ensure_initialized() ee = _namespace["ee"] import geeViz.assetManagerLib as aml if not aml.ee_asset_exists(source_id): return json.dumps({"error": f"Source asset not found: {source_id}"}) if aml.ee_asset_exists(dest_id): if overwrite: try: ee.data.deleteAsset(dest_id) except Exception as exc: return json.dumps({"error": f"Failed to delete existing dest: {exc}"}) else: return json.dumps({ "error": f"Destination already exists: {dest_id}. " "Set overwrite=True to replace it.", }) # Copy first try: ee.data.copyAsset(source_id, dest_id) except Exception as exc: return json.dumps({"error": f"Copy step failed: {exc}"}) # Delete source only after successful copy try: ee.data.deleteAsset(source_id) except Exception as exc: return json.dumps({ "error": f"Asset copied to {dest_id} but failed to delete source: {exc}", "dest_id": dest_id, }) return json.dumps({ "success": True, "source_id": source_id, "dest_id": dest_id, "message": f"Asset moved from {source_id} to {dest_id}.", })
# --------------------------------------------------------------------------- # Tool 31: create_folder # ---------------------------------------------------------------------------
[docs] @app.tool() def create_folder( folder_path: str, folder_type: str = "Folder", ) -> str: """Create a GEE folder or ImageCollection. Creates intermediate folders recursively if they don't exist. Args: folder_path: Full asset path for the new folder (e.g. "projects/my-project/assets/my_folder"). folder_type: "Folder" (default) or "ImageCollection". Returns: JSON confirmation or error. """ _ensure_initialized() import geeViz.assetManagerLib as aml if folder_type not in ("Folder", "ImageCollection"): return json.dumps({ "error": f"Invalid folder_type: {folder_type!r}. " "Must be 'Folder' or 'ImageCollection'.", }) stdout_buf = io.StringIO() try: with contextlib.redirect_stdout(stdout_buf): if folder_type == "ImageCollection": aml.create_image_collection(folder_path) else: aml.create_asset(folder_path, recursive=True) except Exception as exc: return json.dumps({ "error": f"Create failed: {exc}", "stdout": stdout_buf.getvalue(), }) return json.dumps({ "success": True, "folder_path": folder_path, "folder_type": folder_type, "stdout": stdout_buf.getvalue().strip(), "message": f"{folder_type} created at {folder_path}.", })
# --------------------------------------------------------------------------- # Tool 32: update_acl # ---------------------------------------------------------------------------
[docs] @app.tool() def update_acl( asset_id: str, all_users_can_read: bool = False, readers: str = "", writers: str = "", ) -> str: """Update permissions (ACL) on a GEE asset. Uses geeViz's assetManagerLib.updateACL to set the access control list. Args: asset_id: Full asset path. all_users_can_read: If True, the asset is publicly readable (default False). readers: Comma-separated email addresses of users with read access. writers: Comma-separated email addresses of users with write access. Returns: JSON confirmation or error. """ _ensure_initialized() import geeViz.assetManagerLib as aml readers_list = [r.strip() for r in readers.split(",") if r.strip()] if readers else [] writers_list = [w.strip() for w in writers.split(",") if w.strip()] if writers else [] stdout_buf = io.StringIO() try: with contextlib.redirect_stdout(stdout_buf): aml.updateACL( asset_id, writers=writers_list, all_users_can_read=all_users_can_read, readers=readers_list, ) except Exception as exc: return json.dumps({ "error": f"ACL update failed: {exc}", "stdout": stdout_buf.getvalue(), }) return json.dumps({ "success": True, "asset_id": asset_id, "all_users_can_read": all_users_can_read, "readers": readers_list, "writers": writers_list, "stdout": stdout_buf.getvalue().strip(), "message": f"Permissions updated for {asset_id}.", })
# --------------------------------------------------------------------------- # Tool 33: get_collection_info # ---------------------------------------------------------------------------
[docs] @app.tool() def get_collection_info( collection_id: str, start_date: str = "", end_date: str = "", region_var: str = "", ) -> str: """Get summary information for an Earth Engine ImageCollection. Loads a collection by asset ID (does NOT require a namespace variable), applies optional date and region filters, and returns image count, date range, band names/types, and spatial extent. Args: collection_id: Full asset ID of the ImageCollection (e.g. "LANDSAT/LC09/C02/T1_L2"). start_date: Optional start date filter (YYYY-MM-DD). end_date: Optional end date filter (YYYY-MM-DD). region_var: Optional name of an ee.Geometry or ee.FeatureCollection variable for spatial filtering. Returns: JSON with collection statistics. """ _ensure_initialized() ee = _namespace["ee"] try: collection = ee.ImageCollection(collection_id) except Exception as exc: return json.dumps({"error": f"Failed to load collection: {exc}"}) # Apply filters if start_date: collection = collection.filterDate(start_date, end_date or "2099-01-01") elif end_date: collection = collection.filterDate("1970-01-01", end_date) if region_var: region = _namespace.get(region_var) if region is None: return json.dumps({"error": f"Variable {region_var!r} not found in namespace."}) if isinstance(region, ee.FeatureCollection): region = region.geometry() elif not isinstance(region, ee.Geometry): return json.dumps({ "error": f"Variable {region_var!r} is {type(region).__name__}, " "expected ee.Geometry or ee.FeatureCollection.", }) collection = collection.filterBounds(region) result: dict = {"collection_id": collection_id} try: count = collection.size().getInfo() result["image_count"] = count except Exception as exc: return json.dumps({"error": f"Failed to get collection size: {exc}"}) if count == 0: result["message"] = "Collection is empty (with applied filters)." return json.dumps(result) # Date range try: date_range = collection.reduceColumns( ee.Reducer.minMax(), ["system:time_start"] ).getInfo() import datetime as _dt min_ms = date_range.get("min") max_ms = date_range.get("max") if min_ms is not None: result["first_date"] = _dt.datetime.utcfromtimestamp(min_ms / 1000).strftime("%Y-%m-%d") if max_ms is not None: result["last_date"] = _dt.datetime.utcfromtimestamp(max_ms / 1000).strftime("%Y-%m-%d") except Exception: pass # Band info from first image try: first_info = collection.first().getInfo() if first_info and "bands" in first_info: bands = [] for b in first_info["bands"]: bands.append({ "name": b.get("id", ""), "data_type": b.get("data_type", {}).get("precision", ""), "crs": b.get("crs", ""), "dimensions": b.get("dimensions"), "scale": b.get("crs_transform", [None])[0], }) result["bands"] = bands except Exception: pass return json.dumps(result)
# --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main() -> None: # stdio is standard for Cursor/IDE integration; use streamable-http for HTTP transport = os.environ.get("MCP_TRANSPORT", "stdio") if transport == "streamable-http": host = os.environ.get("MCP_HOST", "127.0.0.1") port = int(os.environ.get("MCP_PORT", "8000")) path = os.environ.get("MCP_PATH", "/mcp") print(f"MCP server starting at http://{host}:{port}{path}", file=sys.stderr) try: app.run(transport=transport, host=host, port=port, path=path) except TypeError: # SDK may not accept host/port/path; use env or defaults app.run(transport=transport) else: app.run(transport=transport) if __name__ == "__main__": # print(inspect_asset("COPERNICUS/S2_SR_HARMONIZED")) main() # %%