Skip to content

Commit

Permalink
fix Doubled (#8)
Browse files Browse the repository at this point in the history
* fix issues/2
  • Loading branch information
dkodnik authored May 6, 2019
1 parent 3b05a88 commit 4b9f0a5
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 46 deletions.
26 changes: 23 additions & 3 deletions config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
CHAIN=test
; Адрес ноды
;ADDRESS=http://127.0.0.1:8841
;ADDRESS=http://10.0.69.133:8841
; -- чем плохо с ноды Minter-team: у них выкл. keep_state_history = false
ADDRESS=https://minter-node-1.testnet.minter.network
; Сколько не дозагружать из блокчейна блоков, в случае если Tandermint не успел собрать всю информацию о подписантах
BLOCK_CORRECTION=4
Expand All @@ -19,4 +17,26 @@ CLICKHOUSE_ADDRESS=tcp://127.0.0.1:9000
; Количество загружаемых за раз блоков (0 - не может быть)
BLOCKS_LOAD=100
; Пауза между загрузками блоков (сек)
BLOCKS_LOAD_PAUSE=10
BLOCKS_LOAD_PAUSE=15

[parser]
; Количество воркеров для отработки Блоков
WORKER_BLOCK=10
; Размер канала-буфферизации для отработки Блоков
CHAN_BLOCK=20
; Количество воркеров для отработки Событий блока
WORKER_BEVNT=10
; Размер канала-буфферизации для отработки Событий блока
CHAN_BEVNT=20
; Количество воркеров для отработки Транзакций
WORKER_TRX=100
; Размер канала-буфферизации для отработки Транзакций
CHAN_TRX=200
; Количество воркеров для отработки Нод блокчейна
WORKER_NODE=10
; Размер канала-буфферизации для отработки Нод блокчейна
CHAN_NODE=10
; Количество воркеров для отработки Валидаторов(нод) блока
WORKER_BNODE=10
; Размер канала-буфферизации для отработки Валидаторов(нод) блока
CHAN_BNODE=20
57 changes: 56 additions & 1 deletion initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ import (
"github.com/go-redis/redis"
)

const mbchV = "0.20" // версия Minter

const c_workerBlock = 10 // количество воркеров для отработки Блоков
const c_chanBlock = 20 // размер канала-буфферизации для отработки Блоков
const c_workerBEvnt = 10 // количество воркеров для отработки Событий блока
const c_chanBEvnt = 20 // размер канала-буфферизации для отработки Событий блока
const c_workerTrx = 100 // количество воркеров для отработки Транзакций
const c_chanTrx = 200 // размер канала-буфферизации для отработки Транзакций
const c_workerNode = 10 // количество воркеров для отработки Нод блокчейна
const c_chanNode = 10 // размер канала-буфферизации для отработки Нод блокчейна
const c_workerBNode = 10 // количество воркеров для отработки Валидаторов(нод) блока
const c_chanBNode = 20 // размер канала-буфферизации для отработки Валидаторов(нод) блока

// Структура для файла "start.json"
type ConfigStart struct {
AppUserX []s.NodeUserX `json:"app_userx"`
Expand Down Expand Up @@ -107,7 +120,49 @@ func initParser() {
if err != nil {
loadCorrection = 1000
}


secP3 := cfg.Section("parser")
workerBlock, err = secP3.Key("WORKER_BLOCK").Uint()
if err != nil || workerBlock == 0 {
workerBlock = c_workerBlock
}
chanBlock, err = secP3.Key("CHAN_BLOCK").Uint()
if err != nil || chanBlock == 0 {
chanBlock = c_chanBlock
}
workerBEvnt, err = secP3.Key("WORKER_BEVNT").Uint()
if err != nil || workerBEvnt == 0 {
workerBEvnt = c_workerBEvnt
}
chanBEvnt, err = secP3.Key("CHAN_BEVNT").Uint()
if err != nil || chanBEvnt == 0 {
chanBEvnt = c_chanBEvnt
}
workerTrx, err = secP3.Key("WORKER_TRX").Uint()
if err != nil || workerTrx == 0 {
workerTrx = c_workerTrx
}
chanTrx, err = secP3.Key("CHAN_TRX").Uint()
if err != nil || chanTrx == 0 {
chanTrx = c_chanTrx
}
workerNode, err = secP3.Key("WORKER_NODE").Uint()
if err != nil || workerNode == 0 {
workerNode = c_workerNode
}
chanNode, err = secP3.Key("CHAN_NODE").Uint()
if err != nil || chanNode == 0 {
chanNode = c_chanNode
}
workerBNode, err = secP3.Key("WORKER_BNODE").Uint()
if err != nil || workerBNode == 0 {
workerBNode = c_workerBNode
}
chanBNode, err = secP3.Key("CHAN_BNODE").Uint()
if err != nil || chanBNode == 0 {
chanBNode = c_chanBNode
}

secDB := cfg.Section("database")
CoinMinter = ms.GetBaseCoin()

Expand Down
31 changes: 15 additions & 16 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,6 @@ import (
"github.com/go-redis/redis"
)

const worherBlock = 10 // количество воркеров для отработки Блоков
const chanBlock = 20 // размер канала-буфферизации для отработки Блоков
const worherBEvnt = 10 // количество воркеров для отработки Событий блока
const chanBEvnt = 20 // размер канала-буфферизации для отработки Событий блока
const worherTrx = 100 // количество воркеров для отработки Транзакций
const chanTrx = 200 // размер канала-буфферизации для отработки Транзакций
const worherNode = 10 // количество воркеров для отработки Нод блокчейна
const chanNode = 10 // размер канала-буфферизации для отработки Нод блокчейна
const worherBNode = 10 // количество воркеров для отработки Валидаторов(нод) блока
const chanBNode = 20 // размер канала-буфферизации для отработки Валидаторов(нод) блока
const mbchV = "0.20" // версия Minter

var (
CoinMinter string // Основная монета Minter
amntN_block int // всего блоков в сети
Expand All @@ -38,6 +26,17 @@ var (
worketInputBNode chan B1NExt
worketInputBEvnt chan uint32

workerBlock uint
chanBlock uint
workerBEvnt uint
chanBEvnt uint
workerTrx uint
chanTrx uint
workerNode uint
chanNode uint
workerBNode uint
chanBNode uint

dbSQL *sqlx.DB
dbSys *redis.Client
)
Expand All @@ -55,16 +54,16 @@ func main() {
worketInputBEvnt = make(chan uint32, chanBEvnt)

// запуск воркеров-демонов
for i := 0; i < worherBlock; i++ {
for i := uint(0); i < workerBlock; i++ {
go startWorkerBlock(i, worketInputBlock)
}
for i := 0; i < worherTrx; i++ {
for i := uint(0); i < workerTrx; i++ {
go startWorkerTrx(i, worketInputTrx)
}
for i := 0; i < worherBNode; i++ {
for i := uint(0); i < workerBNode; i++ {
go startWorkerBNode(i, worketInputBNode)
}
for i := 0; i < worherBEvnt; i++ {
for i := uint(0); i < workerBEvnt; i++ {
go startWorkerBEvnt(i, worketInputBEvnt)
}

Expand Down
18 changes: 18 additions & 0 deletions sql_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,25 @@ func addNodeSql(db *sqlx.DB, dt *s.NodeExt) bool {

// Добавить о ноде историю блоков в SQL
func addNodeBlockstorySql(db *sqlx.DB, dt *s.NodeExt) bool {
chNAr := srchNodeBlockstory(db, dt.PubKey) // Для проверки на задвоенность блока для ноды

for _, bs1 := range dt.Blocks {

//+Проверка
fndBS := false
for _, tst1 := range chNAr {
if tst1.ID == bs1.ID && tst1.Type == bs1.Type {
fndBS = true
}
}

if fndBS {
log("ERR", fmt.Sprint("[sql_node.go] addNodeBlockstorySql(Find!) - [", dt.PubKey, " ", bs1.ID, " ", bs1.Type, "]"), "")
//panic("!!!")
continue
}
//-

tx := db.MustBegin()
qPg := `
INSERT INTO node_blockstory (
Expand Down
2 changes: 1 addition & 1 deletion w_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

// Воркер для обработки Блока и записи его в БД
func startWorkerBlock(workerNum int, in <-chan ms.BlockResponse) {
func startWorkerBlock(workerNum uint, in <-chan ms.BlockResponse) {
for retBlck := range in {
// Данные нового блока (в структуре подобной что в SDK)
retBlckEx := s.BlockResponse2{}
Expand Down
30 changes: 7 additions & 23 deletions w_evnt.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// Воркер для обработки Событий блока и записи в БД (node_story)
func startWorkerBEvnt(workerNum int, in <-chan uint32) {
func startWorkerBEvnt(workerNum uint, in <-chan uint32) {
for bHeight := range in {
// Разбор Событий(event's) блока
retEv, err := sdk.GetEvents(int(bHeight))
Expand All @@ -25,37 +25,21 @@ func startWorkerBEvnt(workerNum int, in <-chan uint32) {
nodeDelgateRet := []s.NodeExt{}
nodeDelgateRet = srchNodeSql_all(dbSQL)

// разбираем события для блока
// разбираем события для блока, прочие события в других местах (декларирование{trxCreateNode}, запуск{trxStartNode}/остановка{trxStopNode} и пропуск{startWorkerBNode} - в других местах!)
for _, retEv1 := range retEv.Events {

// ШТРАФ:
if retEv1.Type == "minter/SlashEvent" {
oneNodeX1 := s.NodeExt{}
oneNodeX1 = srchNodeSql_pk(dbSQL, retEv1.Value.ValidatorPubKey) // [*]
if oneNodeX1.PubKey != "" {
// TODO: возложить на SQL проверку, исправить не на полную прогрузку и анализ тут в коде
// а на поиск в БД
oneNodeX1.Blocks = srchNodeBlockstory(dbSQL, retEv1.Value.ValidatorPubKey) // прогружаем блоки

// только перебором и сравнивать id и тип в blocks_story! запрос не корректно отрабатывается при поиске во вложениях
findY := false
for _, el1E := range oneNodeX1.Blocks {
// проверяем - записан-ли уже штраф в базу:
if el1E.ID == bHeight && el1E.Type == "SlashEvent" {
findY = true
}
}

if !findY {
// нету, добавляем!
oneNodeX1.Blocks = append(oneNodeX1.Blocks, s.BlocksStory{ID: bHeight, Type: "SlashEvent"})

if !addNodeBlockstorySql(dbSQL, &oneNodeX1) {
log("ERR", "[w_evnt.go] startWorkerBEvnt(addNodeBlockstorySql) SlashEvent", "")
}
log("STR", fmt.Sprintf("SlashEvent:: %d - %s", bHeight, oneNodeX1.PubKey), "")
// нету, добавляем!
oneNodeX1.Blocks = append(oneNodeX1.Blocks, s.BlocksStory{ID: bHeight, Type: "SlashEvent"})

if !addNodeBlockstorySql(dbSQL, &oneNodeX1) {
log("ERR", "[w_evnt.go] startWorkerBEvnt(addNodeBlockstorySql) SlashEvent", "")
}
log("STR", fmt.Sprintf("SlashEvent:: %d - %s", bHeight, oneNodeX1.PubKey), "")
}
}

Expand Down
2 changes: 1 addition & 1 deletion w_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type NodeExtInfo struct {
}

// Воркер для обработки Валидаторов блока и записи в БД (node_story) и MEM
func startWorkerBNode(workerNum int, in <-chan B1NExt) {
func startWorkerBNode(workerNum uint, in <-chan B1NExt) {
for pV1 := range in {
if pV1.Signed == false {
// пропустили блок!
Expand Down
2 changes: 1 addition & 1 deletion w_trx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type TrxExt struct {
}

// Воркер для обработки Транзакций и записи в БД
func startWorkerTrx(workerNum int, in <-chan TrxExt) {
func startWorkerTrx(workerNum uint, in <-chan TrxExt) {
var err error
//var buffTrx []ms.TransResponse

Expand Down

0 comments on commit 4b9f0a5

Please sign in to comment.