Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
code review comments
  • Loading branch information
anandrgitnirman committed May 26, 2022
1 parent 2ac4623 commit b265b1d
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 104 deletions.
17 changes: 13 additions & 4 deletions snetd/cmd/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type Components struct {
tokenManager token.Manager
tokenService *escrow.TokenService
modelService training.ModelServer
modelUserStorage *training.ModelStorage
modelUserStorage *training.ModelUserStorage
modelStorage *training.ModelStorage
}

func InitComponents(cmd *cobra.Command) (components *Components) {
Expand Down Expand Up @@ -550,20 +551,28 @@ func (components *Components) ConfigurationService() *configuration_service.Conf
return components.configurationService
}

func (components *Components) ModelUserStorage() *training.ModelStorage {
func (components *Components) ModelUserStorage() *training.ModelUserStorage {
if components.modelUserStorage != nil {
return components.modelUserStorage
}
components.modelUserStorage = training.NewUserModelStorage(components.AtomicStorage())
components.modelUserStorage = training.NewUerModelStorage(components.AtomicStorage())
return components.modelUserStorage
}

func (components *Components) ModelStorage() *training.ModelStorage {
if components.modelStorage != nil {
return components.modelStorage
}
components.modelStorage = training.NewModelStorage(components.AtomicStorage())
return components.modelStorage
}

func (components *Components) ModelService() training.ModelServer {
if components.modelService != nil {
return components.modelService
}
components.modelService = training.NewModelService(components.PaymentChannelService(), components.ServiceMetaData(),
components.OrganizationMetaData(), components.ModelUserStorage())
components.OrganizationMetaData(), components.ModelStorage(), components.ModelUserStorage())
return components.modelService
}

Expand Down
149 changes: 94 additions & 55 deletions training/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type ModelService struct {
organizationMetaData *blockchain.OrganizationMetaData
channelService escrow.PaymentChannelService
storage *ModelStorage
userStorage *ModelUserStorage
serviceUrl string
}

Expand Down Expand Up @@ -77,24 +78,80 @@ func (service ModelService) getServiceClient() (conn *grpc.ClientConn, client Mo
client = NewModelClient(conn)
return
}
func (service ModelService) storeModelDetails(request *CreateModelRequest, response *ModelDetailsResponse) (err error) {
func (service ModelService) createModelDetails(request *CreateModelRequest, response *ModelDetailsResponse) (err error) {
key := service.getModelKeyToCreate(request, response)
data := service.createModelData(request, response)

err = service.storage.Put(key, data)
//for every accessible address in the list , store the user address and all the model Ids associated with it
for _, address := range data.AuthorizedAddresses {
userKey := getModelUserKey(key, address)
userData := service.getModelUserData(key, address)
err = service.userStorage.Put(userKey, userData)
}
return
}
func getModelUserKey(key *ModelKey, address string) *ModelUserKey {
return &ModelUserKey{
OrganizationId: key.OrganizationId,
ServiceId: key.ServiceId,
GroupId: key.GroupId,
MethodName: key.MethodName,
UserAddress: address,
}
}

func (service ModelService) getModelUserData(key *ModelKey, address string) *ModelUserData {
//Check if there are any model Ids already associated with this user
modelIds := make([]string, 1)
userKey := getModelUserKey(key, address)
if data, ok, err := service.userStorage.Get(userKey); ok && err != nil && data != nil {
modelIds = data.ModelIds
}
modelIds = append(modelIds, key.ModelId)
return &ModelUserData{
OrganizationId: key.OrganizationId,
ServiceId: key.ServiceId,
GroupId: key.GroupId,
MethodName: key.MethodName,
UserAddress: address,
ModelIds: modelIds,
}
}

func (service ModelService) deleteUserModelDetails(key *ModelKey, data *ModelData) (err error) {

for _, address := range data.AuthorizedAddresses {
userKey := getModelUserKey(key, address)
if data, ok, err := service.userStorage.Get(userKey); ok && err != nil && data != nil {
data.ModelIds = remove(data.ModelIds, key.ModelId)
err = service.userStorage.Put(userKey, data)
}
}
return
}

func (service ModelService) deleteModelDetails(request *UpdateModelRequest, response *ModelDetailsResponse) (err error) {
func remove(s []string, r string) []string {
for i, v := range s {
if v == r {
return append(s[:i], s[i+1:]...)
}
}
return s
}

func (service ModelService) deleteModelDetails(request *UpdateModelRequest) (err error) {
key := service.getModelKeyToUpdate(request.ModelDetailsRequest)
data, ok, err := service.storage.Get(key)
if ok && err != nil {
data.Status = "DELETED"
err = service.storage.Put(key, data)
err = service.deleteUserModelDetails(key, data)
}
return
}

func (service ModelService) getModelDetails(request *UpdateModelRequest, response *ModelDetailsResponse) (data *ModelUserData, err error) {
func (service ModelService) getModelDetails(request *UpdateModelRequest, response *ModelDetailsResponse) (data *ModelData, err error) {

key := service.getModelKeyToUpdate(request.ModelDetailsRequest)
data, ok, err := service.storage.Get(key)
Expand Down Expand Up @@ -132,8 +189,8 @@ func (service ModelService) updateModelDetailsForStatus(request *ModelDetailsReq
}
return
}
func (service ModelService) getModelKeyToCreate(request *CreateModelRequest, response *ModelDetailsResponse) (key *ModelUserKey) {
key = &ModelUserKey{
func (service ModelService) getModelKeyToCreate(request *CreateModelRequest, response *ModelDetailsResponse) (key *ModelKey) {
key = &ModelKey{
OrganizationId: config.GetString(config.OrganizationId),
ServiceId: config.GetString(config.ServiceId),
GroupId: service.organizationMetaData.GetGroupIdString(),
Expand All @@ -143,8 +200,8 @@ func (service ModelService) getModelKeyToCreate(request *CreateModelRequest, res
return
}

func (service ModelService) getModelKeyToUpdate(request *ModelDetailsRequest) (key *ModelUserKey) {
key = &ModelUserKey{
func (service ModelService) getModelKeyToUpdate(request *ModelDetailsRequest) (key *ModelKey) {
key = &ModelKey{
OrganizationId: config.GetString(config.OrganizationId),
ServiceId: config.GetString(config.ServiceId),
GroupId: service.organizationMetaData.GetGroupIdString(),
Expand All @@ -154,12 +211,12 @@ func (service ModelService) getModelKeyToUpdate(request *ModelDetailsRequest) (k
return
}

func (service ModelService) getModelDataForUpdate(request *UpdateModelRequest, response *ModelDetailsResponse) (data *ModelUserData, err error) {
func (service ModelService) getModelDataForUpdate(request *UpdateModelRequest, response *ModelDetailsResponse) (data *ModelData, err error) {
data, err = service.getModelDataForStatusUpdate(request.ModelDetailsRequest, response)
return
}

func (service ModelService) getModelDataForStatusUpdate(request *ModelDetailsRequest, response *ModelDetailsResponse) (data *ModelUserData, err error) {
func (service ModelService) getModelDataForStatusUpdate(request *ModelDetailsRequest, response *ModelDetailsResponse) (data *ModelData, err error) {
key := service.getModelKeyToUpdate(request)
ok := false

Expand All @@ -171,18 +228,18 @@ func (service ModelService) getModelDataForStatusUpdate(request *ModelDetailsReq
func (service ModelService) GetAllModels(c context.Context, request *AccessibleModelsRequest) (response *AccessibleModelsResponse, err error) {
if request == nil || request.Authorization == nil {
return &AccessibleModelsResponse{Status: Status_ERROR},
fmt.Errorf(" INVALID REQUEST DETAILS , no Authorization provided ")
fmt.Errorf(" Invalid request , no Authorization provided ")
}
if err = service.verifySignerForCreateModel(request.Authorization); err != nil {
if err = service.verifySignature(request.Authorization); err != nil {
return &AccessibleModelsResponse{Status: Status_ERROR},
fmt.Errorf(" authentication FAILED , %v", err)
fmt.Errorf(" Unable to access model , %v", err)
}
//TODO
return
}

func (service ModelService) createModelData(request *CreateModelRequest, response *ModelDetailsResponse) (data *ModelUserData) {
data = &ModelUserData{
func (service ModelService) createModelData(request *CreateModelRequest, response *ModelDetailsResponse) (data *ModelData) {
data = &ModelData{
Status: string(response.Status),
CreatedByAddress: request.Authorization.SignerAddress,
AuthorizedAddresses: request.AddressList,
Expand All @@ -192,6 +249,11 @@ func (service ModelService) createModelData(request *CreateModelRequest, respons
ServiceId: config.GetString(config.ServiceId),
GroupId: service.organizationMetaData.GetGroupIdString(),
}
//by default add the creator to the Authorized list of Address
if data.AuthorizedAddresses == nil {
data.AuthorizedAddresses = make([]string, 1)
}
data.AuthorizedAddresses = append(data.AuthorizedAddresses, data.CreatedByAddress)
return
}

Expand All @@ -200,11 +262,11 @@ func (service ModelService) CreateModel(c context.Context, request *CreateModelR
// verify the request
if request == nil || request.Authorization == nil {
return &ModelDetailsResponse{Status: Status_ERROR},
fmt.Errorf(" INVALID REQUEST DETAILS , no Authorization provided , %v", err)
fmt.Errorf(" Invalid request , no Authorization provided , %v", err)
}
if err = service.verifySignerForCreateModel(request.Authorization); err != nil {
if err = service.verifySignature(request.Authorization); err != nil {
return &ModelDetailsResponse{Status: Status_ERROR},
fmt.Errorf(" authentication FAILED , %v", err)
fmt.Errorf(" Unable to access model , %v", err)
}
// make a call to the client
// if the response is successful , store details in etcd
Expand All @@ -215,7 +277,7 @@ func (service ModelService) CreateModel(c context.Context, request *CreateModelR
if err == nil {
//store the details in etcd
log.Infof("Creating model based on response from CreateModel")
if err = service.storeModelDetails(request, response); err != nil {
if err = service.createModelDetails(request, response); err != nil {
return response, fmt.Errorf("issue with storing Model Id in the Daemon Storage %v", err)
}
}
Expand All @@ -231,11 +293,11 @@ func (service ModelService) UpdateModelAccess(c context.Context, request *Update
err error) {
if request == nil || request.ModelDetailsRequest == nil || request.ModelDetailsRequest.Authorization == nil {
return &ModelDetailsResponse{Status: Status_ERROR},
fmt.Errorf(" INVALID REQUEST DETAILS , no Authorization provided , %v", err)
fmt.Errorf(" Invalid request , no Authorization provided , %v", err)
}
if err = service.verifySignerForUpdateModel(request.ModelDetailsRequest.Authorization); err != nil {
if err = service.verifySignature(request.ModelDetailsRequest.Authorization); err != nil {
return &ModelDetailsResponse{Status: Status_ERROR},
fmt.Errorf(" authentication FAILED , %v", err)
fmt.Errorf(" Unable to access model , %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
Expand All @@ -257,18 +319,18 @@ func (service ModelService) DeleteModel(c context.Context, request *UpdateModelR
err error) {
if request == nil || request.ModelDetailsRequest == nil || request.ModelDetailsRequest.Authorization == nil {
return &ModelDetailsResponse{Status: Status_ERROR},
fmt.Errorf(" INVALID REQUEST DETAILS , no Authorization provided , %v", err)
fmt.Errorf(" Invalid request , no Authorization provided , %v", err)
}
if err = service.verifySignerForDeleteModel(request.ModelDetailsRequest.Authorization); err != nil {
if err = service.verifySignature(request.ModelDetailsRequest.Authorization); err != nil {
return &ModelDetailsResponse{Status: Status_ERROR},
fmt.Errorf(" authentication FAILED , %v", err)
fmt.Errorf(" Unable to access model , %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*200)
defer cancel()
if conn, client, err := service.getServiceClient(); err == nil {
response, err = client.DeleteModel(ctx, request)
log.Infof("Deleting model based on response from DeleteModel")
if err = service.deleteModelDetails(request, response); err != nil {
if err = service.deleteModelDetails(request); err != nil {
return response, fmt.Errorf("issue with deleting Model Id in Storage %v", err)
}
deferConnection(conn)
Expand All @@ -283,11 +345,11 @@ func (service ModelService) GetModelStatus(c context.Context, request *ModelDeta
err error) {
if request == nil || request.Authorization == nil {
return &ModelDetailsResponse{Status: Status_ERROR},
fmt.Errorf(" INVALID REQUEST DETAILS , no Authorization provided , %v", err)
fmt.Errorf(" Invalid request , no Authorization provided , %v", err)
}
if err = service.verifySignerForGetModelStatus(request.Authorization); err != nil {
if err = service.verifySignature(request.Authorization); err != nil {
return &ModelDetailsResponse{Status: Status_ERROR},
fmt.Errorf(" authentication FAILED , %v", err)
fmt.Errorf(" Unable to access model , %v", err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*200)
defer cancel()
Expand All @@ -306,32 +368,8 @@ func (service ModelService) GetModelStatus(c context.Context, request *ModelDeta
}

//message used to sign is of the form ("__create_model", mpe_address, current_block_number)
func (service *ModelService) verifySignerForCreateModel(request *AuthorizationDetails) error {
return utils.VerifySigner(service.getMessageBytes("__CreateModel", request),
request.GetSignature(), utils.ToChecksumAddress(request.SignerAddress))
}

//message used to sign is of the form ("__update_model", mpe_address, current_block_number)
func (service *ModelService) verifySignerForUpdateModel(request *AuthorizationDetails) error {
return utils.VerifySigner(service.getMessageBytes("__UpdateModelAccess", request),
request.GetSignature(), utils.ToChecksumAddress(request.SignerAddress))
}

//message used to sign is of the form ("__delete_model", mpe_address, current_block_number)
func (service *ModelService) verifySignerForDeleteModel(request *AuthorizationDetails) error {
return utils.VerifySigner(service.getMessageBytes("__DeleteModel", request),
request.GetSignature(), utils.ToChecksumAddress(request.SignerAddress))
}

//message used to sign is of the form ("__delete_model", mpe_address, current_block_number)
func (service *ModelService) verifySignerForGetModelStatus(request *AuthorizationDetails) error {
return utils.VerifySigner(service.getMessageBytes("__GetModelStatus", request),
request.GetSignature(), utils.ToChecksumAddress(request.SignerAddress))
}

//message used to sign is of the form ("__get_training_status", mpe_address, current_block_number)
func (service *ModelService) verifySignatureForGetAllModels(request *AuthorizationDetails) error {
return utils.VerifySigner(service.getMessageBytes("__GetAllModels", request),
func (service *ModelService) verifySignature(request *AuthorizationDetails) error {
return utils.VerifySigner(service.getMessageBytes(request.Message, request),
request.GetSignature(), utils.ToChecksumAddress(request.SignerAddress))
}

Expand All @@ -347,14 +385,15 @@ func (service *ModelService) getMessageBytes(prefixMessage string, request *Auth
}

func NewModelService(channelService escrow.PaymentChannelService, serMetaData *blockchain.ServiceMetadata,
orgMetadata *blockchain.OrganizationMetaData, storage *ModelStorage) ModelServer {
orgMetadata *blockchain.OrganizationMetaData, storage *ModelStorage, userStorage *ModelUserStorage) ModelServer {
serviceURL := config.GetString(config.ModelTrainingEndpoint)
if config.IsValidUrl(serviceURL) {
return &ModelService{
channelService: channelService,
serviceMetaData: serMetaData,
organizationMetaData: orgMetadata,
storage: storage,
userStorage: userStorage,
serviceUrl: serviceURL,
}
} else {
Expand Down
Loading

0 comments on commit b265b1d

Please sign in to comment.