-
Notifications
You must be signed in to change notification settings - Fork 0
/
machine.go
169 lines (142 loc) · 4.56 KB
/
machine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package tango
import (
"fmt"
"sync"
)
// ResponseStatus is a type that represents the status of a response.
type MachineContext[Services, State any] struct {
Services Services
PreviousResult *Response[Services, State]
State State
Machine *Machine[Services, State]
}
// Plugin is an interface that represents a machine plugin.
type MachineConfig[Services, State any] struct {
Log bool
LogLevel string
Plugins []Plugin[Services, State]
}
// Machine is a struct that represents a machine.
type Machine[Services, State any] struct {
Name string
Context *MachineContext[Services, State]
Steps []Step[Services, State]
ExecutedSteps []Step[Services, State]
InitialContext *MachineContext[Services, State]
Config *MachineConfig[Services, State]
mu sync.Mutex
Strategy ExecutionStrategy[Services, State]
}
// NewMachine creates a new machine.
func NewMachine[Services, State any](
name string,
steps []Step[Services, State],
initialContext *MachineContext[Services, State],
config *MachineConfig[Services, State],
strategy ExecutionStrategy[Services, State],
) *Machine[Services, State] {
m := &Machine[Services, State]{
Name: name,
Steps: steps,
InitialContext: initialContext,
Context: initialContext,
Config: config,
Strategy: strategy,
}
m.Context.Machine = m
return m
}
// AddStep adds a step to the machine.
func (m *Machine[Services, State]) AddStep(step Step[Services, State]) {
m.Steps = append(m.Steps, step)
}
// Reset resets the machine to its initial state. It clears the context and executed steps.
func (m *Machine[Services, State]) Reset() {
m.Steps = nil
m.Context = m.InitialContext
m.ExecutedSteps = nil
}
// Run executes the machine steps.
func (m *Machine[Services, State]) Run() (*Response[Services, State], error) {
if len(m.Steps) == 0 {
return nil, fmt.Errorf("no steps to execute")
}
for _, plugin := range m.Config.Plugins {
if err := plugin.Init(m.Context); err != nil {
return nil, fmt.Errorf("plugin setup error: %v", err)
}
newStrategy := plugin.ModifyExecutionStrategy(m)
if newStrategy != nil {
m.Strategy = newStrategy
}
}
response, err := m.Strategy.Execute(m)
if err != nil {
return nil, err
}
for _, plugin := range m.Config.Plugins {
if err := plugin.Cleanup(m.Context); err != nil {
return nil, fmt.Errorf("plugin cleanup error: %v", err)
}
}
return response, nil
}
// executeStep runs the step and its before and after functions.
func (m *Machine[Services, State]) executeStep(step Step[Services, State]) (*Response[Services, State], error) {
if m.Config.Log {
fmt.Printf("executing step: %s\n", step.Name)
}
for _, plugin := range m.Config.Plugins {
if err := plugin.Execute(m.Context); err != nil {
return nil, fmt.Errorf("plugin before step error: %v", err)
}
}
if step.BeforeExecute != nil {
if err := step.BeforeExecute(m.Context); err != nil {
return nil, err
}
}
if step.Execute == nil {
return nil, fmt.Errorf("step %s has no execute function", step.Name)
}
response, err := step.Execute(m.Context)
if err != nil {
return nil, err
}
if step.AfterExecute != nil {
if err := step.AfterExecute(m.Context); err != nil {
return nil, err
}
}
return response, nil
}
// Compensate runs the compensate functions of the executed steps.
func (m *Machine[Services, State]) Compensate() (*Response[Services, State], error) {
return m.Strategy.Compensate(m)
}
// Result is an alias for any.
type Result interface{}
// NewStep creates a new step.
func (m *Machine[Services, State]) NewStep(step *Step[Services, State]) {
m.AddStep(*NewStep(step))
}
// Next creates a response with status NEXT.
func (m *Machine[Services, State]) Next(result Result) *Response[Services, State] {
return Next[Result, Services, State](result)
}
// Done creates a response with status DONE.
func (m *Machine[Services, State]) Done(result Result) *Response[Services, State] {
return Done[Result, Services, State](result)
}
// Error creates a response with status ERROR.
func (m *Machine[Services, State]) Error(result Result) *Response[Services, State] {
return Error[Result, Services, State](result)
}
// Skip creates a response with status SKIP.
func (m *Machine[Services, State]) Skip(result Result, count int) *Response[Services, State] {
return Skip[Result, Services, State](result, count)
}
// Jump creates a response with status JUMP.
func (m *Machine[Services, State]) Jump(result any, target string) *Response[Services, State] {
return Jump[Result, Services, State](result, target)
}