From d618ebdf90f68da5b8e3ca4b24988541e52b4a6c Mon Sep 17 00:00:00 2001 From: svenvc Date: Tue, 9 Jan 2024 11:26:35 +0100 Subject: [PATCH] Make P3Client thread safe for all major operations --- P3/P3Client.class.st | 130 +++++++++++++++++++++++++------------------ 1 file changed, 75 insertions(+), 55 deletions(-) diff --git a/P3/P3Client.class.st b/P3/P3Client.class.st index 4e45b68..f88e5fd 100644 --- a/P3/P3Client.class.st +++ b/P3/P3Client.class.st @@ -81,7 +81,8 @@ Class { 'settings', 'properties', 'converter', - 'message' + 'message', + 'access' ], #category : #'P3-Core' } @@ -114,6 +115,13 @@ P3Client >> authenticationTypes [ 10 -> #SASL } asDictionary ] +{ #category : #'initialize-release' } +P3Client >> beThreadSafe [ + "Configure me so that I can be safely used from multiple threads/processes during important operations" + + access := Monitor new +] + { #category : #private } P3Client >> clearSSL [ settings removeKey: #ssl ifAbsent: [ ] @@ -148,12 +156,13 @@ P3Client >> close [ P3Client >> connect [ "Connect me to a PostgreSQL database. Run the authentication and startup protocols. Configure the session." - - self - ensureOpen; - connectInternal; - clearSSL; - logConnected + + self critical: [ + self + ensureOpen; + connectInternal; + clearSSL; + logConnected ] ] { #category : #protocol } @@ -172,14 +181,14 @@ P3Client >> connectInternal [ P3Client >> connectSSL [ "Connect me to a PostgreSQL database over an encrypted SSL connection. Run the authentication and startup protocols. Configure the session." - - self - ensureOpen; - upgradeToSSL; - connectInternal; - setSSL; - logConnected + self critical: [ + self + ensureOpen; + upgradeToSSL; + connectInternal; + setSSL; + logConnected ] ] { #category : #accessing } @@ -216,6 +225,13 @@ P3Client >> createPreparedStatementNamed: name withParameters: parameterDescript ^ preparedStatement ] +{ #category : #private } +P3Client >> critical: block [ + ^ access + ifNil: block + ifNotNil: [ access critical: block ] +] + { #category : #accessing } P3Client >> database [ "Return the database name I (want to) connect to. @@ -374,6 +390,8 @@ P3Client >> initialize [ settings := IdentityDictionary new. properties := Dictionary new. + + self beThreadSafe ] { #category : #private } @@ -646,41 +664,40 @@ P3Client >> prepare: query named: queryName [ | parameterDescriptions rowDescriptions statement | - self - ensureConnected; - writeParseMessage: query name: queryName types: #(); - writeDescribeMessage: queryName type: $S; - writeSyncMessage. + self critical: [ + self + ensureConnected; + writeParseMessage: query name: queryName types: #(); + writeDescribeMessage: queryName type: $S; + writeSyncMessage. - self readMessage tag = $1 - ifFalse: [ ^ P3Error parseCompleteExpected signal ]. + self readMessage tag = $1 + ifFalse: [ ^ P3Error parseCompleteExpected signal ]. - self readMessage. - message tag = $t - ifTrue: [ parameterDescriptions := self processParameterDescription: message readStream ] - ifFalse: [ - parameterDescriptions := #(). - message tag = $n - ifFalse: [ P3Error noDataExpected signal ] ]. + self readMessage tag = $t + ifTrue: [ parameterDescriptions := self processParameterDescription: message readStream ] + ifFalse: [ + parameterDescriptions := #(). + message tag = $n + ifFalse: [ P3Error noDataExpected signal ] ]. - self readMessage. - message tag = $T - ifTrue: [ rowDescriptions := self processRowDescription: message readStream ] - ifFalse: [ - rowDescriptions := #(). - message tag = $n - ifFalse: [ P3Error noDataExpected signal ] ]. + self readMessage tag = $T + ifTrue: [ rowDescriptions := self processRowDescription: message readStream ] + ifFalse: [ + rowDescriptions := #(). + message tag = $n + ifFalse: [ P3Error noDataExpected signal ] ]. - self readMessage tag = $Z - ifFalse: [ ^ P3Error readyForQueryExpected signal ]. + self readMessage tag = $Z + ifFalse: [ ^ P3Error readyForQueryExpected signal ]. - statement := self - createPreparedStatementNamed: queryName - withParameters: parameterDescriptions - andFields: rowDescriptions. + statement := self + createPreparedStatementNamed: queryName + withParameters: parameterDescriptions + andFields: rowDescriptions. - self logPreparedStatement: statement query: query. - + self logPreparedStatement: statement query: query ]. + ^ statement ] @@ -775,11 +792,12 @@ P3Client >> query: query [ Descriptions is a collection of row field description objects, if any. Data is a collection of rows with fully converted field values as objects, if any." - ^ self withConnection: [ - self - ensureConnected; - writeQueryMessage: query; - runQueryResult ] + ^ self critical: [ + self withConnection: [ + self + ensureConnected; + writeQueryMessage: query; + runQueryResult ] ] ] { #category : #accessing } @@ -901,13 +919,15 @@ P3Client >> runExtendedQueryResults: fieldDescriptions [ | results queryStarted | - queryStarted := Time millisecondClockValue. - self readMessage. - results := Array streamContents: [ :out | - [ - properties at: #query_started ifAbsentPut: queryStarted. "each result will use the same start" - out nextPut: (self runExtendedQueryResult: fieldDescriptions). - self readMessage tag = $Z ] whileFalse ]. + self critical: [ + queryStarted := Time millisecondClockValue. + self readMessage. + results := Array streamContents: [ :out | + [ + "each result will use the same start" + properties at: #query_started ifAbsentPut: queryStarted. + out nextPut: (self runExtendedQueryResult: fieldDescriptions). + self readMessage tag = $Z ] whileFalse ] ]. ^ results ]