Skip to content

Commit

Permalink
Merge pull request stakwork#1945 from aliraza556/Implement-Workflow-D…
Browse files Browse the repository at this point in the history
…atabase-Layer

Implement Database Layer for Workflow Management
  • Loading branch information
humansinstitute authored Nov 14, 2024
2 parents 94a8b97 + 182ddf5 commit 3fbe8c5
Show file tree
Hide file tree
Showing 5 changed files with 605 additions and 0 deletions.
1 change: 1 addition & 0 deletions db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func InitDB() {
db.AutoMigrate(&FeaturePhase{})
db.AutoMigrate(&FeatureStory{})
db.AutoMigrate(&WfRequest{})
db.AutoMigrate(&WfProcessingMap{})

DB.MigrateTablesWithOrgUuid()
DB.MigrateOrganizationToWorkspace()
Expand Down
14 changes: 14 additions & 0 deletions db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,18 @@ type Database interface {
SetPaymentAsComplete(tag string) bool
SetPaymentStatusByBountyId(bountyId uint, tagResult V2TagRes) bool
GetWorkspacePendingPayments(workspace_uuid string) []NewPaymentHistory
CreateWorkflowRequest(req *WfRequest) error
UpdateWorkflowRequest(req *WfRequest) error
GetWorkflowRequestByID(requestID string) (*WfRequest, error)
GetWorkflowRequestsByStatus(status WfRequestStatus) ([]WfRequest, error)
GetWorkflowRequest(requestID string) (*WfRequest, error)
UpdateWorkflowRequestStatusAndResponse(requestID string, status WfRequestStatus, responseData JSONB) error
GetWorkflowRequestsByWorkflowID(workflowID string) ([]WfRequest, error)
GetPendingWorkflowRequests(limit int) ([]WfRequest, error)
DeleteWorkflowRequest(requestID string) error
CreateProcessingMap(pm *WfProcessingMap) error
UpdateProcessingMap(pm *WfProcessingMap) error
GetProcessingMapByKey(processType, processKey string) (*WfProcessingMap, error)
GetProcessingMapsByType(processType string) ([]WfProcessingMap, error)
DeleteProcessingMap(id uint) error
}
19 changes: 19 additions & 0 deletions db/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -919,10 +919,25 @@ const (
StatusFailed WfRequestStatus = "FAILED"
)

type WfProcessingMap struct {
ID uint `gorm:"primaryKey;autoIncrement" json:"id"`
Type string `gorm:"index;not null" json:"type"`
ProcessKey string `gorm:"index;not null" json:"process_key"`
RequiresProcessing bool `gorm:"default:false" json:"requires_processing"`
HandlerFunc string `json:"handler_func,omitempty"`
Config JSONB `gorm:"type:jsonb" json:"config,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}

type WfRequest struct {
ID uint `gorm:"primaryKey;autoIncrement" json:"id"`
RequestID string `gorm:"unique;not null" json:"request_id"`
WorkflowID string `gorm:"index" json:"workflow_id"`
Source string `gorm:"index" json:"source"`
Action string `gorm:"index" json:"action"`
Status WfRequestStatus `json:"status"`
ProjectID string `json:"project_id,omitempty"`
RequestData JSONB `gorm:"type:jsonb" json:"request_data"`
ResponseData JSONB `gorm:"type:jsonb" json:"response_data,omitempty"`
CreatedAt time.Time `json:"created_at"`
Expand Down Expand Up @@ -965,6 +980,10 @@ func (ConnectionCodesShort) TableName() string {
return "connectioncodes"
}

func (WfProcessingMap) TableName() string {
return "wf_processing_maps"
}

func (WfRequest) TableName() string {
return "wf_requests"
}
Expand Down
221 changes: 221 additions & 0 deletions db/workflow_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package db

import (
"errors"
"time"
)

func (db database) CreateWorkflowRequest(req *WfRequest) error {
if req == nil {
return errors.New("request cannot be nil")
}

now := time.Now()
req.CreatedAt = now
req.UpdatedAt = now

if req.Status == "" {
req.Status = StatusNew
}

result := db.db.Create(req)
return result.Error
}

func (db database) UpdateWorkflowRequest(req *WfRequest) error {
if req == nil {
return errors.New("request cannot be nil")
}

req.UpdatedAt = time.Now()
result := db.db.Model(&WfRequest{}).Where("request_id = ?", req.RequestID).Updates(req)

if result.RowsAffected == 0 {
return errors.New("no workflow request found to update")
}

return result.Error
}

func (db database) GetWorkflowRequestByID(requestID string) (*WfRequest, error) {
if requestID == "" {
return nil, errors.New("request ID cannot be empty")
}

var req WfRequest
result := db.db.Model(&WfRequest{}).Where("request_id = ?", requestID).First(&req)

if result.RowsAffected == 0 {
return nil, nil
}

return &req, result.Error
}

func (db database) GetWorkflowRequestsByStatus(status WfRequestStatus) ([]WfRequest, error) {
var requests []WfRequest

result := db.db.Model(&WfRequest{}).
Where("status = ?", status).
Order("created_at DESC").
Find(&requests)

if result.Error != nil {
return nil, result.Error
}

return requests, nil
}

func (db database) GetWorkflowRequest(requestID string) (*WfRequest, error) {
if requestID == "" {
return nil, errors.New("request ID cannot be empty")
}

var req WfRequest
result := db.db.Model(&WfRequest{}).Where("request_id = ?", requestID).First(&req)

if result.RowsAffected == 0 {
return nil, nil
}

return &req, result.Error
}

func (db database) UpdateWorkflowRequestStatusAndResponse(requestID string, status WfRequestStatus, responseData JSONB) error {
if requestID == "" {
return errors.New("request ID cannot be empty")
}

result := db.db.Model(&WfRequest{}).
Where("request_id = ?", requestID).
Updates(map[string]interface{}{
"status": status,
"response_data": responseData,
"updated_at": time.Now(),
})

if result.RowsAffected == 0 {
return errors.New("no workflow request found to update")
}

return result.Error
}

func (db database) GetWorkflowRequestsByWorkflowID(workflowID string) ([]WfRequest, error) {
if workflowID == "" {
return nil, errors.New("workflow ID cannot be empty")
}

var requests []WfRequest
result := db.db.Model(&WfRequest{}).
Where("workflow_id = ?", workflowID).
Order("created_at DESC").
Find(&requests)

return requests, result.Error
}

func (db database) GetPendingWorkflowRequests(limit int) ([]WfRequest, error) {
if limit <= 0 {
return nil, errors.New("limit must be greater than 0")
}

var requests []WfRequest
result := db.db.Model(&WfRequest{}).
Where("status = ?", StatusPending).
Order("created_at ASC").
Limit(limit).
Find(&requests)

return requests, result.Error
}

func (db database) DeleteWorkflowRequest(requestID string) error {
if requestID == "" {
return errors.New("request ID cannot be empty")
}

result := db.db.Delete(&WfRequest{}, "request_id = ?", requestID)

if result.RowsAffected == 0 {
return errors.New("no workflow request found to delete")
}

return result.Error
}

func (db database) CreateProcessingMap(pm *WfProcessingMap) error {
if pm == nil {
return errors.New("processing map cannot be nil")
}

now := time.Now()
pm.CreatedAt = now
pm.UpdatedAt = now

result := db.db.Create(pm)
return result.Error
}

func (db database) UpdateProcessingMap(pm *WfProcessingMap) error {
if pm == nil {
return errors.New("processing map cannot be nil")
}

pm.UpdatedAt = time.Now()
result := db.db.Model(&WfProcessingMap{}).
Where("id = ?", pm.ID).
Updates(pm)

if result.RowsAffected == 0 {
return errors.New("no processing map found to update")
}

return result.Error
}

func (db database) GetProcessingMapByKey(processType, processKey string) (*WfProcessingMap, error) {
if processType == "" || processKey == "" {
return nil, errors.New("process type and key cannot be empty")
}

var pm WfProcessingMap
result := db.db.Model(&WfProcessingMap{}).
Where("type = ? AND process_key = ?", processType, processKey).
First(&pm)

if result.RowsAffected == 0 {
return nil, nil
}

return &pm, result.Error
}

func (db database) GetProcessingMapsByType(processType string) ([]WfProcessingMap, error) {
if processType == "" {
return nil, errors.New("process type cannot be empty")
}

var maps []WfProcessingMap
result := db.db.Model(&WfProcessingMap{}).
Where("type = ?", processType).
Order("created_at DESC").
Find(&maps)

return maps, result.Error
}

func (db database) DeleteProcessingMap(id uint) error {
if id == 0 {
return errors.New("invalid processing map ID")
}

result := db.db.Delete(&WfProcessingMap{}, id)

if result.RowsAffected == 0 {
return errors.New("no processing map found to delete")
}

return result.Error
}
Loading

0 comments on commit 3fbe8c5

Please sign in to comment.