This repository has been archived by the owner on Aug 31, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
connector_worker.go
135 lines (115 loc) · 3.08 KB
/
connector_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
* Copyright (c) 2002-2019 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package gobolt
import (
"net/url"
"os"
"strconv"
"time"
)
type workerConnector struct {
config Config
delegate *seaboltConnector
pool *workerPool
closeSignal chan signal
}
func (w *workerConnector) Acquire(mode AccessMode) (Connection, error) {
return newWorkerConnection(w, mode)
}
func (w *workerConnector) Close() error {
var err error
var done = make(chan bool, 1)
if poolErr := w.pool.submit(func(stopper <-chan signal) {
err = w.delegate.Close()
done <- true
}); poolErr != nil {
err = poolErr
done <- true
}
<-done
if err != nil {
return err
}
w.pool.close()
close(w.closeSignal)
return nil
}
func newWorkerConnector(url *url.URL, authToken map[string]interface{}, config *Config) (Connector, error) {
var err error
var connector *seaboltConnector
var pool = newWorkerPool(minWorkers(config), maxWorkers(config), keepAlive(config))
var configOverride = *config
configOverride.ConnAcquisitionTimeout = 0
var done = make(chan bool, 1)
if poolErr := pool.submit(func(stopper <-chan signal) {
connector, err = newSeaboltConnector(url, authToken, &configOverride)
done <- true
}); poolErr != nil {
err = poolErr
done <- true
}
// wait for connector creation to complete
<-done
if err != nil {
defer pool.close()
return nil, err
}
return &workerConnector{
config: *config,
delegate: connector,
pool: pool,
closeSignal: make(chan signal, config.MaxPoolSize),
}, nil
}
func workersEnabled() bool {
var workersEnabled = true
if val, ok := os.LookupEnv("BOLTWORKERS"); ok {
if parsed, err := strconv.ParseBool(val); err == nil {
workersEnabled = parsed
}
}
return workersEnabled
}
func maxWorkers(config *Config) int {
var workersMax = int(float64(config.MaxPoolSize) * float64(1.2))
if val, ok := os.LookupEnv("BOLTWORKERSMAX"); ok {
if parsed, err := strconv.ParseInt(val, 10, 32); err == nil {
workersMax = int(parsed)
}
}
return workersMax
}
func minWorkers(config *Config) int {
var workersMin = 0
if val, ok := os.LookupEnv("BOLTWORKERSMIN"); ok {
if parsed, err := strconv.ParseInt(val, 10, 32); err == nil {
workersMin = int(parsed)
}
}
return workersMin
}
func keepAlive(config *Config) time.Duration {
var workersKeepAlive = 5 * time.Minute
if val, ok := os.LookupEnv("BOLTWORKERSKEEPALIVE"); ok {
if parsed, err := time.ParseDuration(val); err == nil {
workersKeepAlive = parsed
}
}
return workersKeepAlive
}