Skip to content

Commit

Permalink
[Explorer] New APIs to expose task graph and result version history (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
geoffxy authored Nov 30, 2024
1 parent ed36802 commit a7f9f65
Show file tree
Hide file tree
Showing 24 changed files with 527 additions and 53 deletions.
1 change: 0 additions & 1 deletion explorer/src/Header.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import logo from "./assets/ConductorLogo_Light.png";
import "./Header.css";

const Header = () => {
console.log("Test");
return (
<div className="header">
<div className="header-logo">
Expand Down
4 changes: 3 additions & 1 deletion src/conductor/cli/explorer.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from conductor.context import Context
from conductor.explorer.explorer import start_explorer
from conductor.utils.user_code import cli_command
from conductor.errors import ConductorAbort
Expand Down Expand Up @@ -31,8 +32,9 @@ def register_command(subparsers):
@cli_command
def main(args):
try:
ctx = Context.from_cwd()
start_explorer(
host=args.host, port=args.port, launch_browser=not args.no_browser
ctx, host=args.host, port=args.port, launch_browser=not args.no_browser
)
except ConductorAbort:
# We ignore this exception because it is raised when the user hits
Expand Down
24 changes: 22 additions & 2 deletions src/conductor/execution/version_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,15 @@ class VersionIndex:
# v0.4.0 and older: FormatVersion = 1
FormatVersion = 2

def __init__(self, conn: sqlite3.Connection, last_timestamp: int):
def __init__(
self,
conn: sqlite3.Connection,
last_timestamp: int,
underlying_db_path: pathlib.Path,
):
self._conn = conn
self._last_timestamp = last_timestamp
self._underlying_db_path = underlying_db_path

@classmethod
def create_or_load(cls, path: pathlib.Path) -> "VersionIndex":
Expand All @@ -85,6 +91,7 @@ def create_or_load(cls, path: pathlib.Path) -> "VersionIndex":
last_timestamp=(
result[0] if result is not None and result[0] is not None else 0
),
underlying_db_path=path,
)

# Need to create the DB
Expand All @@ -93,7 +100,20 @@ def create_or_load(cls, path: pathlib.Path) -> "VersionIndex":
conn.execute(q.set_format_version.format(version=cls.FormatVersion))
conn.execute(q.create_table)
conn.commit()
return VersionIndex(conn, 0)
return VersionIndex(conn, 0, path)

def clone(self) -> "VersionIndex":
"""
Clone this `VersionIndex` instance. This is helpful for creating a
instance associated with the same thread (for the SQLite connection,
needed in Conductor's explorer API).
"""
conn = sqlite3.connect(self._underlying_db_path)
return VersionIndex(
conn=conn,
last_timestamp=self._last_timestamp,
underlying_db_path=self._underlying_db_path,
)

def get_latest_output_version(
self, task_identifier: TaskIdentifier
Expand Down
5 changes: 4 additions & 1 deletion src/conductor/explorer/explorer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import time
import webbrowser
import threading
from conductor.context import Context
from conductor.errors import MissingExplorerSupport


def start_explorer(host: str, port: int, launch_browser: bool) -> None:
def start_explorer(ctx: Context, host: str, port: int, launch_browser: bool) -> None:
"""
Entrypoint to launching Conductor's explorer. This function will attempt to
start the explorer. If the user has not installed the necessary
Expand All @@ -15,9 +16,11 @@ def start_explorer(host: str, port: int, launch_browser: bool) -> None:
# user has not installed Conductor's UI dependencies.
try:
import uvicorn
from conductor.explorer.routes import set_context
except ImportError as ex:
raise MissingExplorerSupport() from ex

set_context(ctx)
uvicorn_config = uvicorn.Config(
"conductor.explorer.routes:app",
host=host,
Expand Down
74 changes: 74 additions & 0 deletions src/conductor/explorer/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import enum
from typing import Optional, List
from pydantic import BaseModel

import conductor.task_identifier as ci
import conductor.task_types.base as ct
from conductor.task_types.combine import Combine
from conductor.task_types.group import Group
from conductor.task_types.run import RunExperiment, RunCommand
from conductor.execution.version_index import Version


class TaskType(enum.Enum):
RunExperiment = "run_experiment"
RunCommand = "run_command"
Group = "group"
Combine = "combine"

@classmethod
def from_cond(cls, task_type: ct.TaskType) -> "TaskType":
if isinstance(task_type, RunExperiment):
return cls.RunExperiment
elif isinstance(task_type, RunCommand):
return cls.RunCommand
elif isinstance(task_type, Group):
return cls.Group
elif isinstance(task_type, Combine):
return cls.Combine
else:
raise ValueError(f"Unknown task type: {task_type}")


class TaskIdentifier(BaseModel):
path: str
name: str

@classmethod
def from_cond(cls, identifier: ci.TaskIdentifier) -> "TaskIdentifier":
return cls(path=str(identifier.path), name=identifier.name)

@property
def display(self) -> str:
return f"//{self.path}/{self.name}"


class ResultVersion(BaseModel):
timestamp: int
commit_hash: Optional[str]
has_uncommitted_changes: bool

@classmethod
def from_version(cls, version: Version) -> "ResultVersion":
return cls(
timestamp=version.timestamp,
commit_hash=version.commit_hash,
has_uncommitted_changes=version.has_uncommitted_changes,
)


class TaskResults(BaseModel):
identifier: TaskIdentifier
versions: List[ResultVersion]


class Task(BaseModel):
task_type: TaskType
identifier: TaskIdentifier
deps: List[TaskIdentifier]


class TaskGraph(BaseModel):
tasks: List[Task]
# These are tasks that have no dependees.
root_tasks: List[TaskIdentifier]
84 changes: 76 additions & 8 deletions src/conductor/explorer/routes.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,97 @@
import importlib.resources as pkg_resources
from typing import Optional, List, Dict

from pydantic import BaseModel
from fastapi import FastAPI
from fastapi import FastAPI, HTTPException
from fastapi.staticfiles import StaticFiles

from conductor.context import Context
from conductor.errors import ConductorError
from conductor.explorer.workspace import Workspace
from conductor.task_identifier import TaskIdentifier
import conductor.explorer as explorer_module
import conductor.explorer.models as m

# Conductor's explorer API requires access to a `Context` instance. This must be
# set globally before the API is used. Note that this means only one API
# instance can run at a time, but that is acceptable for our use case.
ctx: Optional[Context] = None
workspace = Workspace()
app = FastAPI()


class Simple(BaseModel):
message: str
def set_context(context: Context) -> None:
"""
Sets the global context for the explorer API.
"""
global ctx # pylint: disable=global-statement
ctx = context


@app.get("/api/1/hello")
def hello_world() -> Simple:
return Simple(message="Hello, World!")
@app.get("/api/1/results/all_versions")
def get_all_versions() -> List[m.TaskResults]:
"""
Retrieve all versioned results that Conductor manages.
"""
assert ctx is not None
version_index = ctx.version_index.clone()
all_results = version_index.get_all_versions()
mapped: Dict[TaskIdentifier, List[m.ResultVersion]] = {}
for task_id, version in all_results:
if task_id not in mapped:
mapped[task_id] = []
mapped[task_id].append(m.ResultVersion.from_version(version))
list_results = [
m.TaskResults(identifier=m.TaskIdentifier.from_cond(task_id), versions=versions)
for task_id, versions in mapped.items()
]
list_results.sort(key=lambda r: r.identifier.display)
return list_results


@app.get("/api/1/task_graph")
def get_task_graph() -> m.TaskGraph:
"""
Retrieves the entire known task graph.
"""
assert ctx is not None
index = ctx.task_index
index.load_all_known_tasks(ctx.git)
try:
if workspace.root_task_ids is None:
root_task_ids = index.validate_all_loaded_tasks()
workspace.set_root_task_ids(root_task_ids)
else:
root_task_ids = workspace.root_task_ids
tasks = [
m.Task(
task_type=m.TaskType.from_cond(task),
identifier=m.TaskIdentifier.from_cond(task.identifier),
deps=[m.TaskIdentifier.from_cond(dep) for dep in task.deps],
)
for _, task in index.get_all_loaded_tasks().items()
]
return m.TaskGraph(
tasks=tasks,
root_tasks=[
m.TaskIdentifier.from_cond(task_id) for task_id in root_task_ids
],
)
except ConductorError as ex:
raise HTTPException(status_code=400, detail=ex.printable_message()) from ex


# Serve the static pages.
# Note that this should go last as a "catch all" route.
explorer_module_pkg = pkg_resources.files(explorer_module)
with pkg_resources.as_file(explorer_module_pkg) as explorer_module_path:
# Make sure the directory exists. This is needed for our test environments
# where we do not build the explorer UI. The file is supposed to exist as a
# symlink.
static_dir = explorer_module_path / "static"
if not static_dir.exists():
static_dir.mkdir(exist_ok=True)
app.mount(
"/",
StaticFiles(directory=explorer_module_path / "static", html=True),
StaticFiles(directory=static_dir, html=True),
name="static",
)
21 changes: 21 additions & 0 deletions src/conductor/explorer/workspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import List, Optional
from conductor.task_identifier import TaskIdentifier


class Workspace:
"""
Used to cache useful state used by our explorer APIs.
"""

def __init__(self) -> None:
self._root_task_ids: Optional[List[TaskIdentifier]] = None

def clear(self) -> None:
self._root_task_ids = None

@property
def root_task_ids(self) -> Optional[List[TaskIdentifier]]:
return self._root_task_ids

def set_root_task_ids(self, root_task_ids: List[TaskIdentifier]) -> None:
self._root_task_ids = root_task_ids
Loading

0 comments on commit a7f9f65

Please sign in to comment.