Skip to content

Commit

Permalink
[Envs] Implement operation to transfer repository to a remote environ…
Browse files Browse the repository at this point in the history
…ment (#114)
  • Loading branch information
geoffxy authored Nov 29, 2024
1 parent d3f6ee4 commit 4be77ff
Show file tree
Hide file tree
Showing 19 changed files with 348 additions and 13 deletions.
8 changes: 8 additions & 0 deletions errors/errors.yml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@
name: CombineOutputFileConflict
message: "The output file or directory '{output_file}' already exists and cannot be overwritten by the combine() task."

3007:
name: MissingEnvSupport
message: This feature requires support for Conductor environments. Please install Conductor with the [env] features.

3008:
name: EnvsRequireGit
message: To use a Conductor environments, your project must be tracked in a Git repository.


# Archive and restore errors (error code 4xxx)
4001:
Expand Down
14 changes: 14 additions & 0 deletions proto/maestro.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ service Maestro {
// A temporary RPC for testing purposes.
rpc Ping(PingRequest) returns (PingResponse) {}

// Used to initialize a repository (workspace) for the current project.
rpc UnpackBundle(UnpackBundleRequest) returns (UnpackBundleResponse) {}

// Tell the daemon to shut down.
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse) {}
}
Expand All @@ -26,3 +29,14 @@ message ShutdownRequest {
message ShutdownResponse {
string message = 1;
}

message UnpackBundleRequest {
// A path to the bundle file (relative to the maestro root).
string bundle_path = 1;
}

message UnpackBundleResponse {
// A unique identifier for the unpacked bundle (workspace). We use this to
// differentiate between different copies of the same repository.
string workspace_name = 1;
}
3 changes: 3 additions & 0 deletions src/conductor/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,8 @@
MAESTRO_VENV_NAME = "condenv"
MAESTRO_LOG_FILE = "maestro.log"
MAESTRO_COND_WHEEL_TEMPLATE = "conductor_cli-{version}-py3-none-any.whl"
MAESTRO_BUNDLE_LOCATION = "bundles"
MAESTRO_WORKSPACE_LOCATION = "workspaces"
MAESTRO_WORKSPACE_NAME_FORMAT = "{name}.{timestamp}"

MAESTRO_PYTHON_VERSION = "3.10.12"
7 changes: 7 additions & 0 deletions src/conductor/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from conductor.config import CONFIG_FILE_NAME, OUTPUT_DIR, VERSION_INDEX_NAME
from conductor.config_file import ConfigFile
from conductor.envs.manager import EnvManager
from conductor.errors import MissingProjectRoot, OutputDirTaken
from conductor.execution.version_index import VersionIndex
from conductor.parsing.task_index import TaskIndex
Expand Down Expand Up @@ -40,6 +41,8 @@ def __init__(self, project_root: pathlib.Path):
# We lazily initialize the TeeProcessor because it may not always be needed.
self._tee_processor: Optional[TeeProcessor] = None

self._env_manager: Optional[EnvManager] = EnvManager.create()

@classmethod
def from_cwd(cls) -> "Context":
"""
Expand Down Expand Up @@ -93,6 +96,10 @@ def tee_processor(self) -> TeeProcessor:
self._tee_processor = TeeProcessor()
return self._tee_processor

@property
def envs(self) -> Optional[EnvManager]:
return self._env_manager

def _ensure_output_dir_exists(self) -> None:
self.output_path.mkdir(exist_ok=True)
if not self.output_path.is_dir():
Expand Down
7 changes: 7 additions & 0 deletions src/conductor/envs/maestro/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import grpc
import pathlib
from typing import Optional
import conductor.envs.proto_gen.maestro_pb2 as pb
import conductor.envs.proto_gen.maestro_pb2_grpc as maestro_grpc
Expand Down Expand Up @@ -43,6 +44,12 @@ def ping(self, message: str) -> str:
msg = pb.PingRequest(message=message)
return self._stub.Ping(msg).message

def unpack_bundle(self, bundle_path: pathlib.Path) -> str:
assert self._stub is not None
# pylint: disable-next=no-member
msg = pb.UnpackBundleRequest(bundle_path=str(bundle_path))
return self._stub.UnpackBundle(msg).workspace_name

def shutdown(self, key: str) -> str:
assert self._stub is not None
# pylint: disable-next=no-member
Expand Down
23 changes: 23 additions & 0 deletions src/conductor/envs/maestro/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import logging
import pathlib
import subprocess
import time

from conductor.envs.maestro.interface import MaestroInterface
from conductor.config import MAESTRO_WORKSPACE_LOCATION, MAESTRO_WORKSPACE_NAME_FORMAT

logger = logging.getLogger(__name__)

Expand All @@ -21,6 +24,26 @@ async def ping(self, message: str) -> str:
result = subprocess.run(["uname", "-a"], capture_output=True, check=False)
return result.stdout.decode("utf-8").strip()

async def unpack_bundle(self, bundle_path: pathlib.Path) -> str:
bundle_name = bundle_path.stem
workspace_name = MAESTRO_WORKSPACE_NAME_FORMAT.format(
name=bundle_name, timestamp=str(int(time.time()))
)
abs_workspace_path = (
self._maestro_root / MAESTRO_WORKSPACE_LOCATION / workspace_name
)
abs_workspace_path.mkdir(parents=True, exist_ok=True)
abs_bundle_path = self._maestro_root / bundle_path
# N.B. Good practice to use async versions here, but we intend to have
# only one caller.
clone_args = ["git", "clone", str(abs_bundle_path), str(abs_workspace_path)]
logger.debug("Running args: %s", str(clone_args))
process = await asyncio.create_subprocess_exec(*clone_args)
await process.wait()
if process.returncode != 0:
raise RuntimeError("Failed to unpack the bundle.")
return workspace_name

async def shutdown(self, key: str) -> str:
logger.info("Received shutdown message with key %s", key)
loop = asyncio.get_running_loop()
Expand Down
8 changes: 8 additions & 0 deletions src/conductor/envs/maestro/grpc_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pathlib
import conductor.envs.proto_gen.maestro_pb2_grpc as rpc
import conductor.envs.proto_gen.maestro_pb2 as pb
from conductor.envs.maestro.interface import MaestroInterface
Expand All @@ -20,6 +21,13 @@ async def Ping(self, request: pb.PingRequest, context) -> pb.PingResponse:
response_message = await self._maestro.ping(request.message)
return pb.PingResponse(message=response_message)

async def UnpackBundle(
self, request: pb.UnpackBundleRequest, context
) -> pb.UnpackBundleResponse:
bundle_path = pathlib.Path(request.bundle_path)
workspace_name = await self._maestro.unpack_bundle(bundle_path)
return pb.UnpackBundleResponse(workspace_name=workspace_name)

async def Shutdown(
self, request: pb.ShutdownRequest, context
) -> pb.ShutdownResponse:
Expand Down
6 changes: 6 additions & 0 deletions src/conductor/envs/maestro/interface.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import pathlib


class MaestroInterface:
"""
Captures the RPC interface for Maestro. We use this interface to separate
Expand All @@ -7,5 +10,8 @@ class MaestroInterface:
async def ping(self, message: str) -> str:
raise NotImplementedError

async def unpack_bundle(self, bundle_path: pathlib.Path) -> str:
raise NotImplementedError

async def shutdown(self, key: str) -> str:
raise NotImplementedError
8 changes: 4 additions & 4 deletions src/conductor/envs/manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Optional, TYPE_CHECKING

if TYPE_CHECKING:
from conductor.envs.maestro.client import MaestroGrpcClient
from conductor.envs.remote_env import RemoteEnv


class EnvManager:
Expand Down Expand Up @@ -34,11 +34,11 @@ def __init__(self, impl: Any) -> None:

self._impl: mgr_impl.EnvManagerImpl = impl

def start_remote_env(self, name: str, host: str, user: str) -> "MaestroGrpcClient":
def start_remote_env(self, name: str, host: str, user: str) -> "RemoteEnv":
return self._impl.start_remote_env(name, host, user)

def get_client(self, name: str) -> "MaestroGrpcClient":
return self._impl.get_client(name)
def get_remote_env(self, name: str) -> "RemoteEnv":
return self._impl.get_remote_env(name)

def shutdown_remote_env(self, name: str) -> None:
return self._impl.shutdown_remote_env(name)
9 changes: 4 additions & 5 deletions src/conductor/envs/manager_impl.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,23 @@
from typing import Dict

from conductor.envs.maestro.client import MaestroGrpcClient
from conductor.envs.remote_env import RemoteEnv


class EnvManagerImpl:
def __init__(self) -> None:
self._active_envs: Dict[str, RemoteEnv] = {}

def start_remote_env(self, name: str, host: str, user: str) -> MaestroGrpcClient:
def start_remote_env(self, name: str, host: str, user: str) -> RemoteEnv:
if name in self._active_envs:
# This is a internal error as we should not be trying to start an
# environment more than once.
raise ValueError(f"Environment with name {name} already exists.")
remote_env = RemoteEnv.start(host, user)
self._active_envs[name] = remote_env
return remote_env.client()
return remote_env

def get_client(self, name: str) -> MaestroGrpcClient:
return self._active_envs[name].client()
def get_remote_env(self, name: str) -> RemoteEnv:
return self._active_envs[name]

def shutdown_remote_env(self, name: str) -> None:
try:
Expand Down
10 changes: 7 additions & 3 deletions src/conductor/envs/proto_gen/maestro_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions src/conductor/envs/proto_gen/maestro_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,15 @@ class ShutdownResponse(_message.Message):
MESSAGE_FIELD_NUMBER: _ClassVar[int]
message: str
def __init__(self, message: _Optional[str] = ...) -> None: ...

class UnpackBundleRequest(_message.Message):
__slots__ = ("bundle_path",)
BUNDLE_PATH_FIELD_NUMBER: _ClassVar[int]
bundle_path: str
def __init__(self, bundle_path: _Optional[str] = ...) -> None: ...

class UnpackBundleResponse(_message.Message):
__slots__ = ("workspace_name",)
WORKSPACE_NAME_FIELD_NUMBER: _ClassVar[int]
workspace_name: str
def __init__(self, workspace_name: _Optional[str] = ...) -> None: ...
44 changes: 44 additions & 0 deletions src/conductor/envs/proto_gen/maestro_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ def __init__(self, channel):
request_serializer=maestro__pb2.PingRequest.SerializeToString,
response_deserializer=maestro__pb2.PingResponse.FromString,
_registered_method=True)
self.UnpackBundle = channel.unary_unary(
'/conductor.Maestro/UnpackBundle',
request_serializer=maestro__pb2.UnpackBundleRequest.SerializeToString,
response_deserializer=maestro__pb2.UnpackBundleResponse.FromString,
_registered_method=True)
self.Shutdown = channel.unary_unary(
'/conductor.Maestro/Shutdown',
request_serializer=maestro__pb2.ShutdownRequest.SerializeToString,
Expand All @@ -58,6 +63,13 @@ def Ping(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def UnpackBundle(self, request, context):
"""Used to initialize a repository (workspace) for the current project.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def Shutdown(self, request, context):
"""Tell the daemon to shut down.
"""
Expand All @@ -73,6 +85,11 @@ def add_MaestroServicer_to_server(servicer, server):
request_deserializer=maestro__pb2.PingRequest.FromString,
response_serializer=maestro__pb2.PingResponse.SerializeToString,
),
'UnpackBundle': grpc.unary_unary_rpc_method_handler(
servicer.UnpackBundle,
request_deserializer=maestro__pb2.UnpackBundleRequest.FromString,
response_serializer=maestro__pb2.UnpackBundleResponse.SerializeToString,
),
'Shutdown': grpc.unary_unary_rpc_method_handler(
servicer.Shutdown,
request_deserializer=maestro__pb2.ShutdownRequest.FromString,
Expand Down Expand Up @@ -117,6 +134,33 @@ def Ping(request,
metadata,
_registered_method=True)

@staticmethod
def UnpackBundle(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(
request,
target,
'/conductor.Maestro/UnpackBundle',
maestro__pb2.UnpackBundleRequest.SerializeToString,
maestro__pb2.UnpackBundleResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def Shutdown(request,
target,
Expand Down
Loading

0 comments on commit 4be77ff

Please sign in to comment.