Skip to content

Commit

Permalink
Make P3Client thread safe for all major operations
Browse files Browse the repository at this point in the history
  • Loading branch information
svenvc committed Jan 9, 2024
1 parent 35f5741 commit d618ebd
Showing 1 changed file with 75 additions and 55 deletions.
130 changes: 75 additions & 55 deletions P3/P3Client.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ Class {
'settings',
'properties',
'converter',
'message'
'message',
'access'
],
#category : #'P3-Core'
}
Expand Down Expand Up @@ -114,6 +115,13 @@ P3Client >> authenticationTypes [
10 -> #SASL } asDictionary
]

{ #category : #'initialize-release' }
P3Client >> beThreadSafe [
"Configure me so that I can be safely used from multiple threads/processes during important operations"

access := Monitor new
]

{ #category : #private }
P3Client >> clearSSL [
settings removeKey: #ssl ifAbsent: [ ]
Expand Down Expand Up @@ -148,12 +156,13 @@ P3Client >> close [
P3Client >> connect [
"Connect me to a PostgreSQL database.
Run the authentication and startup protocols. Configure the session."

self
ensureOpen;
connectInternal;
clearSSL;
logConnected

self critical: [
self
ensureOpen;
connectInternal;
clearSSL;
logConnected ]
]

{ #category : #protocol }
Expand All @@ -172,14 +181,14 @@ P3Client >> connectInternal [
P3Client >> connectSSL [
"Connect me to a PostgreSQL database over an encrypted SSL connection.
Run the authentication and startup protocols. Configure the session."

self
ensureOpen;
upgradeToSSL;
connectInternal;
setSSL;
logConnected

self critical: [
self
ensureOpen;
upgradeToSSL;
connectInternal;
setSSL;
logConnected ]
]

{ #category : #accessing }
Expand Down Expand Up @@ -216,6 +225,13 @@ P3Client >> createPreparedStatementNamed: name withParameters: parameterDescript
^ preparedStatement
]

{ #category : #private }
P3Client >> critical: block [
^ access
ifNil: block
ifNotNil: [ access critical: block ]
]

{ #category : #accessing }
P3Client >> database [
"Return the database name I (want to) connect to.
Expand Down Expand Up @@ -374,6 +390,8 @@ P3Client >> initialize [

settings := IdentityDictionary new.
properties := Dictionary new.

self beThreadSafe
]

{ #category : #private }
Expand Down Expand Up @@ -646,41 +664,40 @@ P3Client >> prepare: query named: queryName [

| parameterDescriptions rowDescriptions statement |

self
ensureConnected;
writeParseMessage: query name: queryName types: #();
writeDescribeMessage: queryName type: $S;
writeSyncMessage.
self critical: [
self
ensureConnected;
writeParseMessage: query name: queryName types: #();
writeDescribeMessage: queryName type: $S;
writeSyncMessage.

self readMessage tag = $1
ifFalse: [ ^ P3Error parseCompleteExpected signal ].
self readMessage tag = $1
ifFalse: [ ^ P3Error parseCompleteExpected signal ].

self readMessage.
message tag = $t
ifTrue: [ parameterDescriptions := self processParameterDescription: message readStream ]
ifFalse: [
parameterDescriptions := #().
message tag = $n
ifFalse: [ P3Error noDataExpected signal ] ].
self readMessage tag = $t
ifTrue: [ parameterDescriptions := self processParameterDescription: message readStream ]
ifFalse: [
parameterDescriptions := #().
message tag = $n
ifFalse: [ P3Error noDataExpected signal ] ].

self readMessage.
message tag = $T
ifTrue: [ rowDescriptions := self processRowDescription: message readStream ]
ifFalse: [
rowDescriptions := #().
message tag = $n
ifFalse: [ P3Error noDataExpected signal ] ].
self readMessage tag = $T
ifTrue: [ rowDescriptions := self processRowDescription: message readStream ]
ifFalse: [
rowDescriptions := #().
message tag = $n
ifFalse: [ P3Error noDataExpected signal ] ].

self readMessage tag = $Z
ifFalse: [ ^ P3Error readyForQueryExpected signal ].
self readMessage tag = $Z
ifFalse: [ ^ P3Error readyForQueryExpected signal ].

statement := self
createPreparedStatementNamed: queryName
withParameters: parameterDescriptions
andFields: rowDescriptions.
statement := self
createPreparedStatementNamed: queryName
withParameters: parameterDescriptions
andFields: rowDescriptions.

self logPreparedStatement: statement query: query.
self logPreparedStatement: statement query: query ].

^ statement
]

Expand Down Expand Up @@ -775,11 +792,12 @@ P3Client >> query: query [
Descriptions is a collection of row field description objects, if any.
Data is a collection of rows with fully converted field values as objects, if any."

^ self withConnection: [
self
ensureConnected;
writeQueryMessage: query;
runQueryResult ]
^ self critical: [
self withConnection: [
self
ensureConnected;
writeQueryMessage: query;
runQueryResult ] ]
]

{ #category : #accessing }
Expand Down Expand Up @@ -901,13 +919,15 @@ P3Client >> runExtendedQueryResults: fieldDescriptions [

| results queryStarted |

queryStarted := Time millisecondClockValue.
self readMessage.
results := Array streamContents: [ :out |
[
properties at: #query_started ifAbsentPut: queryStarted. "each result will use the same start"
out nextPut: (self runExtendedQueryResult: fieldDescriptions).
self readMessage tag = $Z ] whileFalse ].
self critical: [
queryStarted := Time millisecondClockValue.
self readMessage.
results := Array streamContents: [ :out |
[
"each result will use the same start"
properties at: #query_started ifAbsentPut: queryStarted.
out nextPut: (self runExtendedQueryResult: fieldDescriptions).
self readMessage tag = $Z ] whileFalse ] ].

^ results
]
Expand Down

0 comments on commit d618ebd

Please sign in to comment.