Skip to content

Commit

Permalink
expose Query function
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 committed Nov 28, 2024
1 parent bcacdbf commit 0d39229
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 46 deletions.
24 changes: 12 additions & 12 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,13 @@ func authorizeUser(ctx context.Context, userid string, password string) (
"$password": password,
}
req := &Request{
Req: &api.Request{
req: &api.Request{
Query: queryUser,
Vars: queryVars,
},
DoAuth: NoAuthorize,
doAuth: NoAuthorize,
}
queryResp, err := (&Server{}).DoQuery(ctx, req)
queryResp, err := (&Server{}).doQuery(ctx, req)
if err != nil {
glog.Errorf("Error while query user with id %s: %v", userid, err)
return nil, err
Expand All @@ -328,16 +328,16 @@ func authorizeUser(ctx context.Context, userid string, password string) (

func refreshAclCache(ctx context.Context, ns, refreshTs uint64) error {
req := &Request{
Req: &api.Request{
req: &api.Request{
Query: queryAcls,
ReadOnly: true,
StartTs: refreshTs,
},
DoAuth: NoAuthorize,
doAuth: NoAuthorize,
}

ctx = x.AttachNamespace(ctx, ns)
queryResp, err := (&Server{}).DoQuery(ctx, req)
queryResp, err := (&Server{}).doQuery(ctx, req)
if err != nil {
return errors.Errorf("unable to retrieve acls: %v", err)
}
Expand Down Expand Up @@ -482,7 +482,7 @@ func upsertGuardian(ctx context.Context) error {
`, x.GuardiansId)
groupNQuads := acl.CreateGroupNQuads(x.GuardiansId)
req := &Request{
Req: &api.Request{
req: &api.Request{
CommitNow: true,
Query: query,
Mutations: []*api.Mutation{
Expand All @@ -492,10 +492,10 @@ func upsertGuardian(ctx context.Context) error {
},
},
},
DoAuth: NoAuthorize,
doAuth: NoAuthorize,
}

resp, err := (&Server{}).DoQuery(ctx, req)
resp, err := (&Server{}).doQuery(ctx, req)

// Structs to parse guardians group uid from query response
type groupNode struct {
Expand Down Expand Up @@ -558,7 +558,7 @@ func upsertGroot(ctx context.Context, passwd string) error {
ObjectId: "uid(guid)",
})
req := &Request{
Req: &api.Request{
req: &api.Request{
CommitNow: true,
Query: query,
Mutations: []*api.Mutation{
Expand All @@ -569,10 +569,10 @@ func upsertGroot(ctx context.Context, passwd string) error {
},
},
},
DoAuth: NoAuthorize,
doAuth: NoAuthorize,
}

resp, err := (&Server{}).DoQuery(ctx, req)
resp, err := (&Server{}).doQuery(ctx, req)
if err != nil {
return errors.Wrapf(err, "while upserting user with id %s", x.GrootId)
}
Expand Down
12 changes: 6 additions & 6 deletions edgraph/graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ func ProcessPersistedQuery(ctx context.Context, gqlReq *schema.Request) error {
"$join": join,
}
req := &Request{
Req: &api.Request{
req: &api.Request{
Query: queryForSHA,
Vars: variables,
ReadOnly: true,
},
DoAuth: NoAuthorize,
doAuth: NoAuthorize,
}
storedQuery, err := (&Server{}).DoQuery(ctx, req)
storedQuery, err := (&Server{}).doQuery(ctx, req)

if err != nil {
glog.Errorf("Error while querying sha %s", sha256Hash)
Expand Down Expand Up @@ -109,7 +109,7 @@ func ProcessPersistedQuery(ctx context.Context, gqlReq *schema.Request) error {
}

req = &Request{
Req: &api.Request{
req: &api.Request{
Mutations: []*api.Mutation{
{
Set: []*api.NQuad{
Expand All @@ -129,11 +129,11 @@ func ProcessPersistedQuery(ctx context.Context, gqlReq *schema.Request) error {
},
CommitNow: true,
},
DoAuth: NoAuthorize,
doAuth: NoAuthorize,
}

ctx := context.WithValue(ctx, IsGraphql, true)
_, err := (&Server{}).DoQuery(ctx, req)
_, err := (&Server{}).doQuery(ctx, req)
return err

}
Expand Down
6 changes: 3 additions & 3 deletions edgraph/multi_tenancy_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (s *Server) ResetPassword(ctx context.Context, inp *ResetPasswordInput) err
},
}
req := &Request{
Req: &api.Request{
req: &api.Request{
CommitNow: true,
Query: query,
Mutations: []*api.Mutation{
Expand All @@ -61,10 +61,10 @@ func (s *Server) ResetPassword(ctx context.Context, inp *ResetPasswordInput) err
},
},
},
DoAuth: NoAuthorize,
doAuth: NoAuthorize,
}
ctx = x.AttachNamespace(ctx, inp.Namespace)
resp, err := (&Server{}).DoQuery(ctx, req)
resp, err := (&Server{}).doQuery(ctx, req)
if err != nil {
return errors.Wrapf(err, "Reset password for user %s in namespace %d, got error:",
inp.UserID, inp.Namespace)
Expand Down
50 changes: 27 additions & 23 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,8 @@ func parseSchemaFromAlterOperation(ctx context.Context, op *api.Operation) (
// then restoring from the incremental backup of such a DB would restore even the dropped
// data back. This is also used to capture the delete namespace operation during backup.
func InsertDropRecord(ctx context.Context, dropOp string) error {
_, err := (&Server{}).DoQuery(context.WithValue(ctx, IsGraphql, true), &Request{
Req: &api.Request{
_, err := (&Server{}).doQuery(context.WithValue(ctx, IsGraphql, true), &Request{
req: &api.Request{
Mutations: []*api.Mutation{{
Set: []*api.NQuad{{
Subject: "_:r",
Expand All @@ -368,7 +368,7 @@ func InsertDropRecord(ctx context.Context, dropOp string) error {
}},
}},
CommitNow: true,
}, DoAuth: NoAuthorize})
}, doAuth: NoAuthorize})
return err
}

Expand Down Expand Up @@ -1083,12 +1083,12 @@ type queryContext struct {
// Request represents a query request sent to the doQuery() method on the Server.
// It contains all the metadata required to execute a query.
type Request struct {
// Req is the incoming gRPC request
Req *api.Request
// GqlField is the GraphQL field for which the request is being sent
GqlField gqlSchema.Field
// DoAuth tells whether this request needs ACL authorization or not
DoAuth AuthMode
// req is the incoming gRPC request
req *api.Request
// gqlField is the GraphQL field for which the request is being sent
gqlField gqlSchema.Field
// doAuth tells whether this request needs ACL authorization or not
doAuth AuthMode
}

// Health handles /health and /health?all requests.
Expand Down Expand Up @@ -1209,7 +1209,7 @@ func (s *Server) QueryGraphQL(ctx context.Context, req *api.Request,
}
}
// no need to attach namespace here, it is already done by GraphQL layer
return s.DoQuery(ctx, &Request{Req: req, GqlField: field, DoAuth: getAuthMode(ctx)})
return s.doQuery(ctx, &Request{req: req, gqlField: field, doAuth: getAuthMode(ctx)})
}

func (s *Server) Query(ctx context.Context, req *api.Request) (*api.Response, error) {
Expand Down Expand Up @@ -1247,7 +1247,11 @@ func (s *Server) QueryNoGrpc(ctx context.Context, req *api.Request) (*api.Respon
defer cancel()
}
}
return s.DoQuery(ctx, &Request{Req: req, DoAuth: getAuthMode(ctx)})
return s.doQuery(ctx, &Request{req: req, doAuth: getAuthMode(ctx)})
}

func (s *Server) QueryNoAuth(ctx context.Context, req *api.Request) (*api.Response, error) {
return s.doQuery(ctx, &Request{req: req, doAuth: NoAuthorize})
}

var pendingQueries int64
Expand All @@ -1258,7 +1262,7 @@ func Init() {
maxPendingQueries = x.Config.Limit.GetInt64("max-pending-queries")
}

func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response, rerr error) {
func (s *Server) doQuery(ctx context.Context, req *Request) (resp *api.Response, rerr error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
Expand All @@ -1282,7 +1286,7 @@ func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response,
// glog.Infof("Got a query, DQL form: %+v at %+v", req.req, l.Start.Format(time.RFC3339))
// }

isMutation := len(req.Req.Mutations) > 0
isMutation := len(req.req.Mutations) > 0
methodRequest := methodQuery
if isMutation {
methodRequest = methodMutate
Expand Down Expand Up @@ -1311,15 +1315,15 @@ func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response,
return
}

req.Req.Query = strings.TrimSpace(req.Req.Query)
isQuery := len(req.Req.Query) != 0
req.req.Query = strings.TrimSpace(req.req.Query)
isQuery := len(req.req.Query) != 0
if !isQuery && !isMutation {
span.Annotate(nil, "empty request")
return nil, errors.Errorf("empty request")
}

span.AddAttributes(otrace.StringAttribute("Query", req.Req.Query))
span.Annotatef(nil, "Request received: %v", req.Req)
span.AddAttributes(otrace.StringAttribute("Query", req.req.Query))
span.Annotatef(nil, "Request received: %v", req.req)
if isQuery {
ostats.Record(ctx, x.PendingQueries.M(1), x.NumQueries.M(1))
defer func() {
Expand All @@ -1330,7 +1334,7 @@ func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response,
ostats.Record(ctx, x.NumMutations.M(1))
}

if req.DoAuth == NeedAuthorize && x.IsGalaxyOperation(ctx) {
if req.doAuth == NeedAuthorize && x.IsGalaxyOperation(ctx) {
// Only the guardian of the galaxy can do a galaxy wide query/mutation. This operation is
// needed by live loader.
if err := AuthGuardianOfTheGalaxy(ctx); err != nil {
Expand All @@ -1341,17 +1345,17 @@ func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response,
}

qc := &queryContext{
req: req.Req,
req: req.req,
latency: l,
span: span,
graphql: isGraphQL,
gqlField: req.GqlField,
gqlField: req.gqlField,
}
if rerr = parseRequest(ctx, qc); rerr != nil {
return
}

if req.DoAuth == NeedAuthorize {
if req.doAuth == NeedAuthorize {
if rerr = authorizeRequest(ctx, qc); rerr != nil {
return
}
Expand All @@ -1361,9 +1365,9 @@ func (s *Server) DoQuery(ctx context.Context, req *Request) (resp *api.Response,
// assigned in the processQuery function called below.
defer annotateStartTs(qc.span, qc.req.StartTs)
// For mutations, we update the startTs if necessary.
if isMutation && req.Req.StartTs == 0 {
if isMutation && req.req.StartTs == 0 {
start := time.Now()
req.Req.StartTs = worker.State.GetTimestamp(false)
req.req.StartTs = worker.State.GetTimestamp(false)
qc.latency.AssignTimestamp = time.Since(start)
}
if x.WorkerConfig.AclEnabled {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2
github.com/IBM/sarama v1.43.3
github.com/Masterminds/semver/v3 v3.3.1
github.com/bits-and-blooms/bitset v1.17.0
github.com/blevesearch/bleve/v2 v2.4.3
github.com/dgraph-io/badger/v4 v4.4.0
github.com/dgraph-io/dgo/v240 v240.0.1
Expand Down Expand Up @@ -75,6 +74,7 @@ require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.17.0 // indirect
github.com/blevesearch/bleve_index_api v1.1.12 // indirect
github.com/blevesearch/geo v0.1.20 // indirect
github.com/blevesearch/go-porterstemmer v1.0.3 // indirect
Expand Down
3 changes: 2 additions & 1 deletion worker/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package worker
import (
"context"

"github.com/golang/glog"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgraph/v24/conn"
"github.com/dgraph-io/dgraph/v24/protos/pb"
"github.com/dgraph-io/dgraph/v24/schema"
"github.com/golang/glog"
)

func InitForLite(ps *badger.DB) {
Expand Down

0 comments on commit 0d39229

Please sign in to comment.