From 0d39229d5ffaf1ce6c36f3a207ba8c20387a92ef Mon Sep 17 00:00:00 2001 From: Aman Mangal Date: Fri, 29 Nov 2024 05:19:50 +0530 Subject: [PATCH] expose Query function --- edgraph/access_ee.go | 24 +++++++++--------- edgraph/graphql.go | 12 ++++----- edgraph/multi_tenancy_ee.go | 6 ++--- edgraph/server.go | 50 ++++++++++++++++++++----------------- go.mod | 2 +- worker/embedded.go | 3 ++- 6 files changed, 51 insertions(+), 46 deletions(-) diff --git a/edgraph/access_ee.go b/edgraph/access_ee.go index f6e36cc3ce6..17a5c3192b9 100644 --- a/edgraph/access_ee.go +++ b/edgraph/access_ee.go @@ -308,13 +308,13 @@ func authorizeUser(ctx context.Context, userid string, password string) ( "$password": password, } req := &Request{ - Req: &api.Request{ + req: &api.Request{ Query: queryUser, Vars: queryVars, }, - DoAuth: NoAuthorize, + doAuth: NoAuthorize, } - queryResp, err := (&Server{}).DoQuery(ctx, req) + queryResp, err := (&Server{}).doQuery(ctx, req) if err != nil { glog.Errorf("Error while query user with id %s: %v", userid, err) return nil, err @@ -328,16 +328,16 @@ func authorizeUser(ctx context.Context, userid string, password string) ( func refreshAclCache(ctx context.Context, ns, refreshTs uint64) error { req := &Request{ - Req: &api.Request{ + req: &api.Request{ Query: queryAcls, ReadOnly: true, StartTs: refreshTs, }, - DoAuth: NoAuthorize, + doAuth: NoAuthorize, } ctx = x.AttachNamespace(ctx, ns) - queryResp, err := (&Server{}).DoQuery(ctx, req) + queryResp, err := (&Server{}).doQuery(ctx, req) if err != nil { return errors.Errorf("unable to retrieve acls: %v", err) } @@ -482,7 +482,7 @@ func upsertGuardian(ctx context.Context) error { `, x.GuardiansId) groupNQuads := acl.CreateGroupNQuads(x.GuardiansId) req := &Request{ - Req: &api.Request{ + req: &api.Request{ CommitNow: true, Query: query, Mutations: []*api.Mutation{ @@ -492,10 +492,10 @@ func upsertGuardian(ctx context.Context) error { }, }, }, - DoAuth: NoAuthorize, + doAuth: NoAuthorize, } - resp, err := (&Server{}).DoQuery(ctx, req) + resp, err := (&Server{}).doQuery(ctx, req) // Structs to parse guardians group uid from query response type groupNode struct { @@ -558,7 +558,7 @@ func upsertGroot(ctx context.Context, passwd string) error { ObjectId: "uid(guid)", }) req := &Request{ - Req: &api.Request{ + req: &api.Request{ CommitNow: true, Query: query, Mutations: []*api.Mutation{ @@ -569,10 +569,10 @@ func upsertGroot(ctx context.Context, passwd string) error { }, }, }, - DoAuth: NoAuthorize, + doAuth: NoAuthorize, } - resp, err := (&Server{}).DoQuery(ctx, req) + resp, err := (&Server{}).doQuery(ctx, req) if err != nil { return errors.Wrapf(err, "while upserting user with id %s", x.GrootId) } diff --git a/edgraph/graphql.go b/edgraph/graphql.go index 90bf136dbde..dc865d531ad 100644 --- a/edgraph/graphql.go +++ b/edgraph/graphql.go @@ -71,14 +71,14 @@ func ProcessPersistedQuery(ctx context.Context, gqlReq *schema.Request) error { "$join": join, } req := &Request{ - Req: &api.Request{ + req: &api.Request{ Query: queryForSHA, Vars: variables, ReadOnly: true, }, - DoAuth: NoAuthorize, + doAuth: NoAuthorize, } - storedQuery, err := (&Server{}).DoQuery(ctx, req) + storedQuery, err := (&Server{}).doQuery(ctx, req) if err != nil { glog.Errorf("Error while querying sha %s", sha256Hash) @@ -109,7 +109,7 @@ func ProcessPersistedQuery(ctx context.Context, gqlReq *schema.Request) error { } req = &Request{ - Req: &api.Request{ + req: &api.Request{ Mutations: []*api.Mutation{ { Set: []*api.NQuad{ @@ -129,11 +129,11 @@ func ProcessPersistedQuery(ctx context.Context, gqlReq *schema.Request) error { }, CommitNow: true, }, - DoAuth: NoAuthorize, + doAuth: NoAuthorize, } ctx := context.WithValue(ctx, IsGraphql, true) - _, err := (&Server{}).DoQuery(ctx, req) + _, err := (&Server{}).doQuery(ctx, req) return err } diff --git a/edgraph/multi_tenancy_ee.go b/edgraph/multi_tenancy_ee.go index 27e3c123664..9a63d2470a9 100644 --- a/edgraph/multi_tenancy_ee.go +++ b/edgraph/multi_tenancy_ee.go @@ -51,7 +51,7 @@ func (s *Server) ResetPassword(ctx context.Context, inp *ResetPasswordInput) err }, } req := &Request{ - Req: &api.Request{ + req: &api.Request{ CommitNow: true, Query: query, Mutations: []*api.Mutation{ @@ -61,10 +61,10 @@ func (s *Server) ResetPassword(ctx context.Context, inp *ResetPasswordInput) err }, }, }, - DoAuth: NoAuthorize, + doAuth: NoAuthorize, } ctx = x.AttachNamespace(ctx, inp.Namespace) - resp, err := (&Server{}).DoQuery(ctx, req) + resp, err := (&Server{}).doQuery(ctx, req) if err != nil { return errors.Wrapf(err, "Reset password for user %s in namespace %d, got error:", inp.UserID, inp.Namespace) diff --git a/edgraph/server.go b/edgraph/server.go index 55de0afa473..5cf9da11378 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -358,8 +358,8 @@ func parseSchemaFromAlterOperation(ctx context.Context, op *api.Operation) ( // then restoring from the incremental backup of such a DB would restore even the dropped // data back. This is also used to capture the delete namespace operation during backup. func InsertDropRecord(ctx context.Context, dropOp string) error { - _, err := (&Server{}).DoQuery(context.WithValue(ctx, IsGraphql, true), &Request{ - Req: &api.Request{ + _, err := (&Server{}).doQuery(context.WithValue(ctx, IsGraphql, true), &Request{ + req: &api.Request{ Mutations: []*api.Mutation{{ Set: []*api.NQuad{{ Subject: "_:r", @@ -368,7 +368,7 @@ func InsertDropRecord(ctx context.Context, dropOp string) error { }}, }}, CommitNow: true, - }, DoAuth: NoAuthorize}) + }, doAuth: NoAuthorize}) return err } @@ -1083,12 +1083,12 @@ type queryContext struct { // Request represents a query request sent to the doQuery() method on the Server. // It contains all the metadata required to execute a query. type Request struct { - // Req is the incoming gRPC request - Req *api.Request - // GqlField is the GraphQL field for which the request is being sent - GqlField gqlSchema.Field - // DoAuth tells whether this request needs ACL authorization or not - DoAuth AuthMode + // req is the incoming gRPC request + req *api.Request + // gqlField is the GraphQL field for which the request is being sent + gqlField gqlSchema.Field + // doAuth tells whether this request needs ACL authorization or not + doAuth AuthMode } // Health handles /health and /health?all requests. @@ -1209,7 +1209,7 @@ func (s *Server) QueryGraphQL(ctx context.Context, req *api.Request, } } // no need to attach namespace here, it is already done by GraphQL layer - return s.DoQuery(ctx, &Request{Req: req, GqlField: field, DoAuth: getAuthMode(ctx)}) + return s.doQuery(ctx, &Request{req: req, gqlField: field, doAuth: getAuthMode(ctx)}) } func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) { @@ -1247,7 +1247,11 @@ func (s *Server) QueryNoGrpc(ctx context.Context, req *api.Request) (*api.Respon defer cancel() } } - return s.DoQuery(ctx, &Request{Req: req, DoAuth: getAuthMode(ctx)}) + return s.doQuery(ctx, &Request{req: req, doAuth: getAuthMode(ctx)}) +} + +func (s *Server) QueryNoAuth(ctx context.Context, req *api.Request) (*api.Response, error) { + return s.doQuery(ctx, &Request{req: req, doAuth: NoAuthorize}) } var pendingQueries int64 @@ -1258,7 +1262,7 @@ func Init() { maxPendingQueries = x.Config.Limit.GetInt64("max-pending-queries") } -func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response, rerr error) { +func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response, rerr error) { if ctx.Err() != nil { return nil, ctx.Err() } @@ -1282,7 +1286,7 @@ func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response, // glog.Infof("Got a query, DQL form: %+v at %+v", req.req, l.Start.Format(time.RFC3339)) // } - isMutation := len(req.Req.Mutations) > 0 + isMutation := len(req.req.Mutations) > 0 methodRequest := methodQuery if isMutation { methodRequest = methodMutate @@ -1311,15 +1315,15 @@ func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response, return } - req.Req.Query = strings.TrimSpace(req.Req.Query) - isQuery := len(req.Req.Query) != 0 + req.req.Query = strings.TrimSpace(req.req.Query) + isQuery := len(req.req.Query) != 0 if !isQuery && !isMutation { span.Annotate(nil, "empty request") return nil, errors.Errorf("empty request") } - span.AddAttributes(otrace.StringAttribute("Query", req.Req.Query)) - span.Annotatef(nil, "Request received: %v", req.Req) + span.AddAttributes(otrace.StringAttribute("Query", req.req.Query)) + span.Annotatef(nil, "Request received: %v", req.req) if isQuery { ostats.Record(ctx, x.PendingQueries.M(1), x.NumQueries.M(1)) defer func() { @@ -1330,7 +1334,7 @@ func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response, ostats.Record(ctx, x.NumMutations.M(1)) } - if req.DoAuth == NeedAuthorize && x.IsGalaxyOperation(ctx) { + if req.doAuth == NeedAuthorize && x.IsGalaxyOperation(ctx) { // Only the guardian of the galaxy can do a galaxy wide query/mutation. This operation is // needed by live loader. if err := AuthGuardianOfTheGalaxy(ctx); err != nil { @@ -1341,17 +1345,17 @@ func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response, } qc := &queryContext{ - req: req.Req, + req: req.req, latency: l, span: span, graphql: isGraphQL, - gqlField: req.GqlField, + gqlField: req.gqlField, } if rerr = parseRequest(ctx, qc); rerr != nil { return } - if req.DoAuth == NeedAuthorize { + if req.doAuth == NeedAuthorize { if rerr = authorizeRequest(ctx, qc); rerr != nil { return } @@ -1361,9 +1365,9 @@ func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response, // assigned in the processQuery function called below. defer annotateStartTs(qc.span, qc.req.StartTs) // For mutations, we update the startTs if necessary. - if isMutation && req.Req.StartTs == 0 { + if isMutation && req.req.StartTs == 0 { start := time.Now() - req.Req.StartTs = worker.State.GetTimestamp(false) + req.req.StartTs = worker.State.GetTimestamp(false) qc.latency.AssignTimestamp = time.Since(start) } if x.WorkerConfig.AclEnabled { diff --git a/go.mod b/go.mod index 1e545438f9b..eb2df03b7b4 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,6 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.1.2 github.com/IBM/sarama v1.43.3 github.com/Masterminds/semver/v3 v3.3.1 - github.com/bits-and-blooms/bitset v1.17.0 github.com/blevesearch/bleve/v2 v2.4.3 github.com/dgraph-io/badger/v4 v4.4.0 github.com/dgraph-io/dgo/v240 v240.0.1 @@ -75,6 +74,7 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/agnivade/levenshtein v1.1.1 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bits-and-blooms/bitset v1.17.0 // indirect github.com/blevesearch/bleve_index_api v1.1.12 // indirect github.com/blevesearch/geo v0.1.20 // indirect github.com/blevesearch/go-porterstemmer v1.0.3 // indirect diff --git a/worker/embedded.go b/worker/embedded.go index 4a59621ab8d..467ff570924 100644 --- a/worker/embedded.go +++ b/worker/embedded.go @@ -3,11 +3,12 @@ package worker import ( "context" + "github.com/golang/glog" + "github.com/dgraph-io/badger/v4" "github.com/dgraph-io/dgraph/v24/conn" "github.com/dgraph-io/dgraph/v24/protos/pb" "github.com/dgraph-io/dgraph/v24/schema" - "github.com/golang/glog" ) func InitForLite(ps *badger.DB) {