Skip to content

Commit

Permalink
cloud: Make status messages more detailed
Browse files Browse the repository at this point in the history
  • Loading branch information
ctessum committed Sep 30, 2018
1 parent 6cc45c3 commit a9bd804
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 69 deletions.
4 changes: 2 additions & 2 deletions cloud/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ func (c *Client) Output(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.J
o := &cloudrpc.JobOutput{
Files: make(map[string][]byte),
}
//k8sJob, err := c.getk8sJob(ctx, job)
//TODO: k8sJob, err := c.getk8sJob(ctx, job)
if err != nil {
return nil, err
}
//addrs, err := c.jobOutputAddresses(ctx, job.Name, k8sJob.Spec.Template.Spec.Containers[0].Command)
//TODO: addrs, err := c.jobOutputAddresses(ctx, job.Name, k8sJob.Spec.Template.Spec.Containers[0].Command)
addrs, err := c.jobOutputAddresses(ctx, job.Name, []string{"inmap", "run", "steady"})
if err != nil {
return nil, err
Expand Down
60 changes: 44 additions & 16 deletions cloud/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,24 @@ func NewClient(k kubernetes.Interface, root *cobra.Command, config *viper.Viper,
return c, nil
}

// Create creates (and queues) a Kubernetes job with the given name that executes
// RunJob creates (and queues) a Kubernetes job with the given name that executes
// the given command with the given command-line arguments on the given container
// image. resources specifies the minimum required resources for execution.
func (c *Client) RunJob(ctx context.Context, job *cloudrpc.JobSpec) (*cloudrpc.JobStatus, error) {
if job.Version != inmap.Version {
return nil, fmt.Errorf("incorrect InMAP version: %s != %s", job.Version, inmap.Version)
}

status, err := c.Status(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version})
if err != nil {
return nil, err
}
if status.Status != cloudrpc.Status_Failed { //TODO: status.Status != cloudrpc.Status_Missing && {
// Only create the job if it is missing or failed.
c.Delete(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version})
return status, nil
}

if err := c.stageInputs(ctx, job); err != nil {
return nil, err
}
Expand All @@ -107,20 +118,11 @@ func (c *Client) RunJob(ctx context.Context, job *cloudrpc.JobSpec) (*cloudrpc.J
k8sJob := createJob(userJobName(user, job.Name), job.Cmd, job.Args, c.Image, core.ResourceList{
core.ResourceMemory: resource.MustParse(fmt.Sprintf("%dGi", job.MemoryGB)),
})
k8sJobResult, err := c.jobControl.Create(k8sJob)
_, err = c.jobControl.Create(k8sJob)
if err != nil {
return nil, err
}
return c.jobStatus(k8sJobResult)
}

// Status returns the status of the given job.
func (c *Client) Status(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.JobStatus, error) {
k8sJob, err := c.getk8sJob(ctx, job)
if err != nil {
return nil, err
}
return c.jobStatus(k8sJob)
return c.Status(ctx, &cloudrpc.JobName{Name: job.Name, Version: job.Version})
}

// Delete deletes the given job.
Expand Down Expand Up @@ -167,10 +169,36 @@ func userJobName(user, name string) string {
return strings.Replace(user, "_", "-", -1) + "-" + strings.Replace(name, "_", "-", -1)
}

func (c *Client) jobStatus(j *batch.Job) (*cloudrpc.JobStatus, error) {
return &cloudrpc.JobStatus{
Status: j.Status.String(),
}, nil
// Status returns the status of the given job.
func (c *Client) Status(ctx context.Context, job *cloudrpc.JobName) (*cloudrpc.JobStatus, error) {
s := new(cloudrpc.JobStatus)
/*k8sJob, err := c.getk8sJob(ctx, job)
if err != nil {
return &cloudrpc.JobStatus{
Status: cloudrpc.Status_Missing,
Message: err.Error(),
}, nil
}
for _, c := range k8sJob.Status.Conditions {
if c.Type == batch.JobComplete && c.Status == core.ConditionTrue {
s.Status = cloudrpc.Status_Complete
s.StartTime = k8sJob.Status.StartTime.Time.Unix()
s.CompletionTime = k8sJob.Status.CompletionTime.Time.Unix()
} else if c.Type == batch.JobFailed && c.Status == core.ConditionTrue {
s.Status = cloudrpc.Status_Failed
}
}
if k8sJob.Status.Active > 0 {
s.Status = cloudrpc.Status_Running
s.StartTime = k8sJob.Status.StartTime.Time.Unix()
}*/
//TODO: err = c.checkOutputs(ctx, name, k8sJob.Spec.Template.Spec.Containers[0].Command)
err := c.checkOutputs(ctx, job.Name, []string{"inmap", "run", "steady"})
if err != nil {
s.Status = cloudrpc.Status_Failed
s.Message = fmt.Sprintf("job completed but the following error occurred when checking outputs: %s", err)
}
return s, nil
}

// createJob creates a Kubernetes job specification with the given name that executes the
Expand Down
4 changes: 2 additions & 2 deletions cloud/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestClient_fake(t *testing.T) {
t.Fatal(err)
}
wantStatus := &cloudrpc.JobStatus{
Status: "&JobStatus{Conditions:[],StartTime:<nil>,CompletionTime:<nil>,Active:0,Succeeded:0,Failed:0,}",
// Status: "&JobStatus{Conditions:[],StartTime:<nil>,CompletionTime:<nil>,Active:0,Succeeded:0,Failed:0,}",
}
if !reflect.DeepEqual(wantStatus, status) {
t.Errorf("status:\n%+v\n!=\n%+v", status, wantStatus)
Expand All @@ -112,7 +112,7 @@ func TestClient_fake(t *testing.T) {
t.Fatal(err)
}
wantStatus := &cloudrpc.JobStatus{
Status: "&JobStatus{Conditions:[],StartTime:<nil>,CompletionTime:<nil>,Active:0,Succeeded:0,Failed:0,}",
// Status: "&JobStatus{Conditions:[],StartTime:<nil>,CompletionTime:<nil>,Active:0,Succeeded:0,Failed:0,}",
}
if !reflect.DeepEqual(wantStatus, status) {
t.Errorf("status:\n%+v\n!=\n%+v", status, wantStatus)
Expand Down
18 changes: 15 additions & 3 deletions cloud/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ service CloudRPC {
// output file(s).
rpc RunJob(JobSpec) returns (JobStatus) {}

// OutputAddresses returns status and the addresses the output file(s) of the
// requested simulation name.
// Status returns the status of the simulation with the
// requested name.
rpc Status(JobName) returns(JobStatus) {}

// Output returns the output file(s) of the
Expand Down Expand Up @@ -61,9 +61,21 @@ message JobSpec {
map<string,bytes> FileData = 7;
}

enum Status {
Complete = 0;
Failed = 1;
Missing = 2;
Running = 3;
}

message JobStatus {
// Status holds the current status of the job.
string Status = 1;
Status Status = 1;
string Message = 2;

// Unix time, the number of seconds elapsed since January 1, 1970 UTC
int64 StartTime = 3;
int64 CompletionTime = 4;
}

message JobOutput {
Expand Down
133 changes: 97 additions & 36 deletions cloud/cloudrpc/cloud.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a9bd804

Please sign in to comment.