-
Notifications
You must be signed in to change notification settings - Fork 3
/
messageroute.go
122 lines (100 loc) · 4.07 KB
/
messageroute.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package dogma
import (
"fmt"
"reflect"
)
// HandlesCommand routes command messages to an [AggregateMessageHandler] or
// [IntegrationMessageHandler]. It is used as an argument to the Routes() method
// of [AggregateConfigurer] or [IntegrationConfigurer].
//
// An application MUST NOT route a single command type to more than one handler.
func HandlesCommand[T Command](...HandlesCommandOption) HandlesCommandRoute {
return HandlesCommandRoute{typeOf[Command, T]()}
}
// RecordsEvent routes event messages recorded by an [AggregateMessageHandler]
// or [IntegrationMessageHandler]. It is used as an argument to the Routes()
// method of [AggregateConfigurer] or [IntegrationConfigurer].
//
// An application MUST NOT route a single event type from more than one handler.
func RecordsEvent[T Event](...RecordsEventOption) RecordsEventRoute {
return RecordsEventRoute{typeOf[Event, T]()}
}
// HandlesEvent routes event messages to a [ProcessMessageHandler] or
// [ProjectionMessageHandler]. It is used as an argument to the Routes() method
// of [ProcessConfigurer] or [ProjectionConfigurer].
func HandlesEvent[T Event](...HandlesEventOption) HandlesEventRoute {
return HandlesEventRoute{typeOf[Event, T]()}
}
// ExecutesCommand routes command messages produced by a
// [ProcessMessageHandler]. It is used as an argument to the Routes() method of
// [ProcessConfigurer].
func ExecutesCommand[T Command](...ExecutesCommandOption) ExecutesCommandRoute {
return ExecutesCommandRoute{typeOf[Command, T]()}
}
// SchedulesTimeout routes timeout messages scheduled by
// [ProcessMessageHandler]. It is used as an argument to the Routes() method of
// [ProcessConfigurer].
//
// An application MAY use a single timeout type with more than one process.
func SchedulesTimeout[T Timeout](...SchedulesTimeoutOption) SchedulesTimeoutRoute {
return SchedulesTimeoutRoute{typeOf[Timeout, T]()}
}
type (
// MessageRoute is an interface for types that describe a relationship between a
// message handler and a specific message type.
MessageRoute = interface{ isMessageRoute() }
// Route is an alias for [MessageRoute]
//
// Deprecated: Use [MessageRoute] instead.
Route = MessageRoute
// HandlesCommandRoute describes a route for a handler that handles a
// [Command] of a specific type.
HandlesCommandRoute struct{ Type reflect.Type }
// ExecutesCommandRoute describes a route for a handler that executes a
// [Command] of a specific type.
ExecutesCommandRoute struct{ Type reflect.Type }
// HandlesEventRoute describes a route for a handler that handles an
// [Event] of a specific type.
HandlesEventRoute struct{ Type reflect.Type }
// RecordsEventRoute describes a route for a handler that records an
// [Event] of a specific type.
RecordsEventRoute struct{ Type reflect.Type }
// SchedulesTimeoutRoute describes a route for a handler that schedules a
// [Timeout] of a specific type.
SchedulesTimeoutRoute struct{ Type reflect.Type }
)
type (
// HandlesCommandOption is an option that affects the behavior of the route
// returned by [HandlesCommand].
HandlesCommandOption struct{}
// ExecutesCommandOption is an option that affects the behavior of the route
// returned by [ExecutesCommand].
ExecutesCommandOption struct{}
// HandlesEventOption is an option that affects the behavior of the route
// returned by [HandlesEvent].
HandlesEventOption struct{}
// RecordsEventOption is an option that affects the behavior of the route
// returned by [RecordsEvent].
RecordsEventOption struct{}
// SchedulesTimeoutOption is an option that affects the behavior of the
// route returned by [SchedulesTimeout].
SchedulesTimeoutOption struct{}
)
// typeOf returns the [reflect.Type] for C, which must be a concrete
// implementation of the interface I.
func typeOf[I Message, C Message]() reflect.Type {
concrete := reflect.TypeFor[C]()
if concrete.Kind() == reflect.Pointer {
iface := reflect.TypeFor[I]()
elem := concrete.Elem()
if elem.Implements(iface) {
panic(fmt.Sprintf(
"%s implements %s using non-pointer receivers, use %s instead",
concrete,
iface,
elem,
))
}
}
return concrete
}