-
Notifications
You must be signed in to change notification settings - Fork 0
/
mysql_pool.js
158 lines (135 loc) · 4.74 KB
/
mysql_pool.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
/*jslint node: true */
"use strict";
var mysql = require('mysql');
module.exports = function(connection_or_pool){
console.log("constructor");
var safe_connection = connection_or_pool;
safe_connection.original_query = safe_connection.query;
safe_connection.original_release = safe_connection.release;
safe_connection.original_escape = safe_connection.escape;
// this is a hack to make all errors throw exception that would kill the program
safe_connection.query = function () {
var last_arg = arguments[arguments.length - 1];
var bHasCallback = (typeof last_arg === 'function');
if (!bHasCallback){ // no callback
last_arg = function(){};
//return connection_or_pool.original_query.apply(connection_or_pool, arguments);
}
var count_arguments_without_callback = bHasCallback ? (arguments.length-1) : arguments.length;
var new_args = [];
var q;
for (var i=0; i<count_arguments_without_callback; i++) // except callback
new_args.push(arguments[i]);
if (!bHasCallback)
return new Promise(function(resolve){
new_args.push(resolve);
safe_connection.query.apply(safe_connection, new_args);
});
// add callback with error handling
new_args.push(function(err, results, fields){
if (err){
console.error("\nfailed query: "+q.sql);
/*
//console.error("code: "+(typeof err.code));
if (false && err.code === 'ER_LOCK_DEADLOCK'){
console.log("deadlock, will retry later");
setTimeout(function(){
console.log("retrying deadlock query "+q.sql+" after timeout ...");
connection_or_pool.original_query.apply(connection_or_pool, new_args);
}, 100);
return;
}*/
throw err;
}
if (Array.isArray(results))
results = results.map(function(row){
for (var key in row){
if (Buffer.isBuffer(row[key])) // VARBINARY fields are read as buffer, we have to convert them to string
row[key] = row[key].toString();
}
return Object.assign({}, row);
});
var consumed_time = Date.now() - start_ts;
if (consumed_time > 25)
console.log("long query took "+consumed_time+"ms:\n"+new_args.filter(function(a, i){ return (i<new_args.length-1); }).join(", ")+"\nload avg: "+require('os').loadavg().join(', '));
last_arg(results, fields);
});
//console.log(new_args);
var start_ts = Date.now();
q = connection_or_pool.original_query.apply(connection_or_pool, new_args);
//console.log(q.sql);
return q;
};
safe_connection.cquery = function(){
var conf = require('./conf.js');
if (conf.bFaster)
return arguments[arguments.length - 1]();
safe_connection.query.apply(this, arguments);
};
safe_connection.escape = function(str){
return connection_or_pool.original_escape(str);
};
safe_connection.release = function(){
//console.log("releasing connection");
connection_or_pool.original_release();
};
safe_connection.addQuery = function (arr) {
var query_args = [];
for (var i=1; i<arguments.length; i++) // except first, which is array
query_args.push(arguments[i]);
arr.push(function(callback){ // add callback for async.series() member tasks
if (typeof query_args[query_args.length-1] !== 'function')
query_args.push(function(){callback();}); // add mysql callback
else{
var f = query_args[query_args.length-1];
query_args[query_args.length-1] = function(){
f.apply(f, arguments);
callback();
}
}
safe_connection.query.apply(safe_connection, query_args);
});
};
// this is for pool only
safe_connection.takeConnectionFromPool = function(handleConnection){
if (!handleConnection)
return new Promise(resolve => safe_connection.takeConnectionFromPool(resolve));
connection_or_pool.getConnection(function(err, new_connection) {
if (err)
throw err;
console.log("got connection from pool");
handleConnection(new_connection.original_query ? new_connection : module.exports(new_connection));
});
};
safe_connection.getCountUsedConnections = function(){
return (safe_connection._allConnections.length - safe_connection._freeConnections.length);
};
safe_connection.close = function(cb){
connection_or_pool.end(cb);
};
safe_connection.addTime = function(interval){
return "NOW() + INTERVAL "+interval;
};
safe_connection.getNow = function(){
return "NOW()";
};
safe_connection.getFromUnixTime = function(ts){
return "FROM_UNIXTIME("+ts+")";
};
safe_connection.getRandom = function(){
return "RAND()";
};
safe_connection.forceIndex = function(index){
return "FORCE INDEX ("+ index +")";
};
safe_connection.dropTemporaryTable = function(table){
return "DROP TEMPORARY TABLE IF EXISTS " + table;
};
safe_connection.getIgnore = function(){
return "IGNORE";
};
safe_connection.getUnixTimestamp = function(date){
return "UNIX_TIMESTAMP("+date+")";
};
return safe_connection;
};