diff --git a/app/common_test.go b/app/common_test.go index a8bd8ce94..fff264237 100644 --- a/app/common_test.go +++ b/app/common_test.go @@ -105,6 +105,7 @@ func NewInMemoryTendermintNodeAminoWithValidators(t *testing.T, genesisState []b panic(err) } pocketTypes.CleanPocketNodes() + pocketTypes.StopEvidenceWorker() PCA = nil inMemKB = nil err := inMemDB.Close() @@ -170,6 +171,7 @@ func NewInMemoryTendermintNodeProtoWithValidators(t *testing.T, genesisState []b } pocketTypes.CleanPocketNodes() + pocketTypes.StopEvidenceWorker() PCA = nil inMemKB = nil diff --git a/app/config.go b/app/config.go index 6b20615e6..5467b548e 100644 --- a/app/config.go +++ b/app/config.go @@ -472,6 +472,7 @@ func InitPocketCoreConfig(chains *types.HostedBlockchains, logger log.Logger) { } func ShutdownPocketCore() { + types.StopEvidenceWorker() types.FlushSessionCache() types.StopServiceMetrics() } diff --git a/app/query_test.go b/app/query_test.go index 2a20c1986..296e10d0a 100644 --- a/app/query_test.go +++ b/app/query_test.go @@ -791,6 +791,7 @@ func TestQueryRelay(t *testing.T) { _, stopCli, evtChan := subscribeTo(t, tmTypes.EventNewBlock) select { case <-evtChan: + assert.Equal(t, uint64(1), types.GlobalEvidenceWorker.SuccessfulTasks()) inv, err := types.GetEvidence(types.SessionHeader{ ApplicationPubKey: aat.ApplicationPublicKey, Chain: relay.Proof.Blockchain, @@ -858,6 +859,8 @@ func TestQueryRelayMultipleNodes(t *testing.T) { } _, stopCli, evtChan := subscribeTo(t, tmTypes.EventNewBlock) <-evtChan // Wait for block + chain := sdk.PlaceholderHash + sessionBlockHeight := int64(1) // setup relay for _, v := range validators { relay := types.Relay{ @@ -865,9 +868,9 @@ func TestQueryRelayMultipleNodes(t *testing.T) { Meta: types.RelayMeta{BlockHeight: 5}, // todo race condition here Proof: types.RelayProof{ Entropy: 32598345349034509, - SessionBlockHeight: 1, + SessionBlockHeight: sessionBlockHeight, ServicerPubKey: v.PublicKey().RawString(), - Blockchain: sdk.PlaceholderHash, + Blockchain: chain, Token: aat, Signature: "", }, @@ -886,23 +889,32 @@ func TestQueryRelayMultipleNodes(t *testing.T) { BodyString(expectedRequest). Reply(200). BodyString(expectedResponse) + } - validatorAddress := sdk.GetAddress(v.PublicKey()) - node, nodeErr := types.GetPocketNodeByAddress(&validatorAddress) + select { + case <-evtChan: + // verify that each store task was successful + assert.Equal(t, uint64(len(validators)), types.GlobalEvidenceWorker.SuccessfulTasks()) + // check the evidence store of each node. + for _, v := range validators { + validatorAddress := sdk.GetAddress(v.PublicKey()) + _node, nodeErr := types.GetPocketNodeByAddress(&validatorAddress) + + assert.Nil(t, nodeErr) + inv, err := types.GetEvidence(types.SessionHeader{ + ApplicationPubKey: aat.ApplicationPublicKey, + Chain: chain, + SessionBlockHeight: sessionBlockHeight, + }, types.RelayEvidence, sdk.NewInt(10000), _node.EvidenceStore) + assert.Nil(t, err) + assert.NotNil(t, inv) + assert.Equal(t, int64(1), inv.NumOfProofs) + } - assert.Nil(t, nodeErr) - inv, err := types.GetEvidence(types.SessionHeader{ - ApplicationPubKey: aat.ApplicationPublicKey, - Chain: relay.Proof.Blockchain, - SessionBlockHeight: relay.Proof.SessionBlockHeight, - }, types.RelayEvidence, sdk.NewInt(10000), node.EvidenceStore) - assert.Nil(t, err) - assert.NotNil(t, inv) - assert.Equal(t, inv.NumOfProofs, int64(1)) + cleanup() + stopCli() + gock.Off() } - cleanup() - stopCli() - gock.Off() return }) } diff --git a/go.mod b/go.mod index 6fd567dbb..7a1f89e0a 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,8 @@ replace github.com/tendermint/tm-db => github.com/pokt-network/tm-db v0.5.2-0.20 require ( github.com/cosmos/gogoproto v1.4.10 github.com/cucumber/godog v0.12.5 + github.com/alitto/pond v1.8.1 + github.com/cucumber/godog v0.12.6 github.com/go-kit/kit v0.12.0 github.com/gogo/protobuf v1.3.2 github.com/golang/protobuf v1.5.3 @@ -19,7 +21,7 @@ require ( github.com/prometheus/client_golang v1.11.0 github.com/regen-network/cosmos-proto v0.3.0 github.com/spf13/cobra v1.4.0 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.7.1 github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 github.com/tendermint/go-amino v0.15.1 github.com/tendermint/tendermint v0.33.7 @@ -43,7 +45,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-kit/log v0.2.0 // indirect github.com/go-logfmt/logfmt v0.5.1 // indirect - github.com/gofrs/uuid v4.0.0+incompatible // indirect + github.com/gofrs/uuid v4.2.0+incompatible // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.0.0 // indirect github.com/google/go-cmp v0.5.9 // indirect @@ -52,7 +54,7 @@ require ( github.com/gtank/ristretto255 v0.1.2 // indirect github.com/h2non/parth v0.0.0-20190131123155-b4df798d6542 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect - github.com/hashicorp/go-memdb v1.3.0 // indirect + github.com/hashicorp/go-memdb v1.3.2 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/types/config.go b/types/config.go index 700142da8..26348a308 100644 --- a/types/config.go +++ b/types/config.go @@ -36,6 +36,9 @@ type PocketConfig struct { ValidatorCacheSize int64 `json:"validator_cache_size"` ApplicationCacheSize int64 `json:"application_cache_size"` RPCTimeout int64 `json:"rpc_timeout"` + RPCMaxIdleConns int `json:"rpc_max_idle_conns"` + RPCMaxConnsPerHost int `json:"rpc_max_conns_per_host"` + RPCMaxIdleConnsPerHost int `json:"rpc_max_idle_conns_per_host"` PrometheusAddr string `json:"pocket_prometheus_port"` PrometheusMaxOpenfiles int `json:"prometheus_max_open_files"` MaxClaimAgeForProofRetry int `json:"max_claim_age_for_proof_retry"` @@ -104,6 +107,9 @@ const ( DefaultPocketPrometheusListenAddr = "8083" DefaultPrometheusMaxOpenFile = 3 DefaultRPCTimeout = 30000 + DefaultRPCMaxIdleConns = 1000 + DefaultRPCMaxConnsPerHost = 1000 + DefaultRPCMaxIdleConnsPerHost = 1000 DefaultMaxClaimProofRetryAge = 32 DefaultProofPrevalidation = false DefaultCtxCacheSize = 20 @@ -138,6 +144,9 @@ func DefaultConfig(dataDir string) Config { ValidatorCacheSize: DefaultValidatorCacheSize, ApplicationCacheSize: DefaultApplicationCacheSize, RPCTimeout: DefaultRPCTimeout, + RPCMaxIdleConns: DefaultRPCMaxIdleConns, + RPCMaxConnsPerHost: DefaultRPCMaxConnsPerHost, + RPCMaxIdleConnsPerHost: DefaultRPCMaxIdleConnsPerHost, PrometheusAddr: DefaultPocketPrometheusListenAddr, PrometheusMaxOpenfiles: DefaultPrometheusMaxOpenFile, MaxClaimAgeForProofRetry: DefaultMaxClaimProofRetryAge, diff --git a/x/pocketcore/keeper/service.go b/x/pocketcore/keeper/service.go index 6b0481edc..7433d5ebd 100644 --- a/x/pocketcore/keeper/service.go +++ b/x/pocketcore/keeper/service.go @@ -67,8 +67,12 @@ func (k Keeper) HandleRelay(ctx sdk.Ctx, relay pc.Relay) (*pc.RelayResponse, sdk } return nil, err } - // store the proof before execution, because the proof corresponds to the previous relay - relay.Proof.Store(maxPossibleRelays, node.EvidenceStore) + // move this to a worker that will insert this proof in a series style to avoid memory consumption and relay proof race conditions + // https://github.com/pokt-network/pocket-core/issues/1457 + pc.GlobalEvidenceWorker.Submit(func() { + // store the proof before execution, because the proof corresponds to the previous relay + relay.Proof.Store(maxPossibleRelays, node.EvidenceStore) + }) // attempt to execute respPayload, err := relay.Execute(hostedBlockchains, &nodeAddress) if err != nil { diff --git a/x/pocketcore/keeper/service_test.go b/x/pocketcore/keeper/service_test.go index cd0766e92..55be73b58 100644 --- a/x/pocketcore/keeper/service_test.go +++ b/x/pocketcore/keeper/service_test.go @@ -108,6 +108,10 @@ func testRelayAt( assert.Nil(t, er) validRelay.Proof.Signature = hex.EncodeToString(clientSig) + httpClient := types.GetChainsClient() + defer gock.Off() // Flush pending mocks after test execution + defer gock.RestoreClient(httpClient) + gock.InterceptClient(httpClient) gock.New("https://www.google.com:443"). Post("/"). Reply(200). diff --git a/x/pocketcore/types/config.go b/x/pocketcore/types/config.go index 782400a45..fa2867f07 100644 --- a/x/pocketcore/types/config.go +++ b/x/pocketcore/types/config.go @@ -3,6 +3,7 @@ package types import ( "encoding/hex" "fmt" + "github.com/alitto/pond" "github.com/pokt-network/pocket-core/crypto" "github.com/pokt-network/pocket-core/types" "github.com/tendermint/tendermint/config" @@ -20,13 +21,16 @@ var ( globalRPCTimeout time.Duration GlobalPocketConfig types.PocketConfig GlobalTenderMintConfig config.Config + GlobalEvidenceWorker *pond.WorkerPool ) func InitConfig(chains *HostedBlockchains, logger log.Logger, c types.Config) { ConfigOnce.Do(func() { InitGlobalServiceMetric(chains, logger, c.PocketConfig.PrometheusAddr, c.PocketConfig.PrometheusMaxOpenfiles) }) + InitHttpClient(c.PocketConfig.RPCMaxIdleConns, c.PocketConfig.RPCMaxConnsPerHost, c.PocketConfig.RPCMaxIdleConnsPerHost) InitPocketNodeCaches(c, logger) + InitEvidenceWorker(c, logger) GlobalPocketConfig = c.PocketConfig GlobalTenderMintConfig = c.TendermintConfig if GlobalPocketConfig.LeanPocket { @@ -37,6 +41,18 @@ func InitConfig(chains *HostedBlockchains, logger log.Logger, c types.Config) { SetRPCTimeout(c.PocketConfig.RPCTimeout) } +func InitEvidenceWorker(_ types.Config, logger log.Logger) { + panicHandler := func(p interface{}) { + logger.Error(fmt.Sprintf("evidence storage task panicked: %v", p)) + } + GlobalEvidenceWorker = pond.New( + 1, 0, + pond.IdleTimeout(100), + pond.PanicHandler(panicHandler), + pond.Strategy(pond.Balanced()), + ) +} + func ConvertEvidenceToProto(config types.Config) error { // we have to add a random pocket node so that way lean pokt can still support getting the global evidence cache node := AddPocketNode(crypto.GenerateEd25519PrivKey().GenPrivateKey(), log.NewNopLogger()) @@ -67,6 +83,13 @@ func ConvertEvidenceToProto(config types.Config) error { return nil } +func StopEvidenceWorker() { + if !GlobalEvidenceWorker.Stopped() { + GlobalEvidenceWorker.StopAndWait() + } + GlobalEvidenceWorker = nil +} + func FlushSessionCache() { for _, k := range GlobalPocketNodes { if k.SessionStore != nil { diff --git a/x/pocketcore/types/service.go b/x/pocketcore/types/service.go index 0a47dd5fa..50d8ae40b 100644 --- a/x/pocketcore/types/service.go +++ b/x/pocketcore/types/service.go @@ -18,6 +18,10 @@ import ( const DEFAULTHTTPMETHOD = "POST" +var ( + chainHttpClient *http.Client +) + // "Relay" - A read / write API request from a hosted (non native) external blockchain type Relay struct { Payload Payload `json:"payload"` // the data payload of the request @@ -25,6 +29,36 @@ type Relay struct { Proof RelayProof `json:"proof"` // the authentication scheme needed for work } +func GetChainsClient() *http.Client { + if chainHttpClient == nil { + InitHttpClient(1000, 1000, 1000) + } + + return chainHttpClient +} + +func InitHttpClient(maxIdleConns, maxConnsPerHost, maxIdleConnsPerHost int) { + var chainsTransport *http.Transport + + t, ok := http.DefaultTransport.(*http.Transport) + if ok { + chainsTransport = t.Clone() + // this params may could be handled by config.json, but how much people know about this? + // tbd: figure out the right values to this, rn the priority is stop recreating new http client and connections. + chainsTransport.MaxIdleConns = maxIdleConns + chainsTransport.MaxConnsPerHost = maxConnsPerHost + chainsTransport.MaxIdleConnsPerHost = maxIdleConnsPerHost + } // if not ok, probably is because is *gock.Transport - test only + + chainHttpClient = &http.Client{ + Timeout: globalRPCTimeout * time.Second, + } + + if chainsTransport != nil { + chainHttpClient.Transport = chainsTransport + } +} + // "Validate" - Checks the validity of a relay request using store data func (r *Relay) Validate( ctx sdk.Ctx, @@ -346,7 +380,7 @@ func executeHTTPRequest(payload, url, userAgent string, basicAuth BasicAuth, met } } // execute the request - resp, err := (&http.Client{Timeout: globalRPCTimeout * time.Millisecond}).Do(req) + resp, err := GetChainsClient().Do(req) if err != nil { return "", err } diff --git a/x/pocketcore/types/service_test.go b/x/pocketcore/types/service_test.go index a03d6d1e2..1c1ae7d5e 100644 --- a/x/pocketcore/types/service_test.go +++ b/x/pocketcore/types/service_test.go @@ -169,8 +169,10 @@ func TestRelay_Execute(t *testing.T) { }, } validRelay.Proof.RequestHash = validRelay.RequestHashString() + httpClient := GetChainsClient() defer gock.Off() // Flush pending mocks after test execution - + defer gock.RestoreClient(httpClient) + gock.InterceptClient(httpClient) gock.New("https://server.com"). Post("/relay"). Reply(200). @@ -182,6 +184,7 @@ func TestRelay_Execute(t *testing.T) { URL: "https://server.com/relay/", }}, } + response, err := validRelay.Execute(&hb, &nodeAddr) assert.True(t, err == nil) assert.Equal(t, response, "bar")