diff --git a/build.sbt b/build.sbt index 580482be5..5de2192da 100644 --- a/build.sbt +++ b/build.sbt @@ -85,9 +85,7 @@ lazy val infra = project "io.circe" %% "circe-parser" ).map(_ % V.circe), Elasticsearch.settings(defaultPort = 9200), - inConfig(Compile)( - Postgres.settings(defaultPort = 5432, database = "scaladex") - ), + Postgres.settings(Compile, defaultPort = 5432, database = "scaladex"), javaOptions ++= { val base = (ThisBuild / baseDirectory).value val index = base / "small-index" @@ -105,9 +103,7 @@ lazy val infra = project s"-Dscaladex.elasticsearch.port=$elasticsearchPort" ) }, - inConfig(Test)( - Postgres.settings(defaultPort = 5432, database = "scaladex-test") - ), + Postgres.settings(Test, defaultPort = 5432, database = "scaladex-test"), Test / javaOptions ++= { val elasticsearchPort = startElasticsearch.value val postgresPort = (Test / startPostgres).value diff --git a/project/Docker.scala b/project/Docker.scala new file mode 100644 index 000000000..57dc1e7cd --- /dev/null +++ b/project/Docker.scala @@ -0,0 +1,15 @@ +import org.testcontainers.DockerClientFactory +import org.testcontainers.dockerclient.DockerClientProviderStrategy + +object Docker { + lazy val client = { + CurrentThread.setContextClassLoader[DockerClientProviderStrategy] + DockerClientFactory.instance().client() + } + + def kill(containerId: String): Unit = + try client.killContainerCmd(containerId).exec() + catch { + case _: Throwable => () + } +} diff --git a/project/Elasticsearch.scala b/project/Elasticsearch.scala index 0da4595d8..908827d14 100644 --- a/project/Elasticsearch.scala +++ b/project/Elasticsearch.scala @@ -9,60 +9,55 @@ import java.net.URL import java.io.IOException import org.testcontainers.elasticsearch.ElasticsearchContainer import org.testcontainers.containers.BindMode +import scala.collection.mutable +import scala.collection.concurrent.TrieMap +import java.nio.file.Path object Elasticsearch extends AutoPlugin { + private val containers: mutable.Map[Path, ElasticsearchContainer] = TrieMap.empty + object autoImport { - val elasticsearchDefaultPort = - settingKey[Int]("Port of elasticserach instance") - val elasticsearchFolder = - settingKey[File]("Folder where elasticsearch data are stored") - val startElasticsearch = taskKey[Int]( - "Chek that elasticsearch has already started or else start a container" - ) + val startElasticsearch = taskKey[Int]("Connect to Elasticsearch or start an Elasticsearch container") } import autoImport._ def settings(defaultPort: Int): Seq[Setting[_]] = Seq( - elasticsearchDefaultPort := defaultPort, - elasticsearchFolder := Keys.baseDirectory.value / ".esdata", startElasticsearch := { import sbt.util.CacheImplicits._ - val dataFolder = elasticsearchFolder.value - val defaultPort = elasticsearchDefaultPort.value + val dataFolder = Keys.baseDirectory.value / ".esdata" val streams = Keys.streams.value - val store = streams.cacheStoreFactory.make("last") val logger = streams.log - val tracker = util.Tracked.lastOutput[Unit, Int](store) { - case (_, None) => - checkOrStart(dataFolder, defaultPort, logger) - case (_, Some(previousPort)) => - checkOrStart(dataFolder, previousPort, logger) + if (canConnect(defaultPort)) { + logger.info(s"Elasticsearch available on port $defaultPort") + defaultPort + } else { + // we cache the container to reuse it after a reload + val store = streams.cacheStoreFactory.make("container") + val tracker = util.Tracked.lastOutput[Unit, (String, Int)](store) { + case (_, None) => + startContainer(dataFolder, logger) + case (_, Some((containerId, port))) => + if (canConnect(port)) { + logger.info(s"Elasticsearch container already started on port $port") + (containerId, port) + } else { + Docker.kill(containerId) + startContainer(dataFolder, logger) + } + } + tracker(())._2 } - tracker(()) + }, + Keys.clean := { + Keys.clean.value + val dataFolder = Keys.baseDirectory.value / ".esdata" + containers.get(dataFolder.toPath).foreach(_.close()) + containers.remove(dataFolder.toPath) } ) - private def checkOrStart( - dataFolder: File, - previousPort: Int, - logger: Logger - ): Int = { - logger.info(s"Trying to connect to elasticsearch on port $previousPort") - if (alreadyStarted(previousPort)) { - logger.info(s"Elasticsearch has already started on port $previousPort") - previousPort - } else { - logger.info("Trying to start elasticsearch container") - val port = start(dataFolder) - logger.info( - s"Elasticsearch container successfully started with port $port" - ) - port - } - } - - private def start(dataFolder: File): Int = { + private def startContainer(dataFolder: File, logger: Logger): (String, Int) = { if (!dataFolder.exists) IO.createDirectory(dataFolder) IO.setPermissions(dataFolder, "rwxrwxrwx") @@ -74,16 +69,26 @@ object Elasticsearch extends AutoPlugin { container .withEnv("discovery.type", "single-node") .withEnv("ES_JAVA_OPTS", "-Xms512m -Xmx512m") - .addFileSystemBind( + .withFileSystemBind( dataFolder.toString, "/usr/share/elasticsearch/data", BindMode.READ_WRITE ) - container.start() - container.getFirstMappedPort() + val port = + try { + container.start() + container.getFirstMappedPort() + } catch { + case e: Throwable => + container.stop() + throw e + } + logger.info(s"Ealsticsearch container started on port $port") + containers(dataFolder.toPath) = container + (container.getContainerId, port) } - private def alreadyStarted(port: Int): Boolean = { + private def canConnect(port: Int): Boolean = { val url = new URL(s"http://localhost:$port/") try { val connection = url.openConnection().asInstanceOf[HttpURLConnection] @@ -92,6 +97,7 @@ object Elasticsearch extends AutoPlugin { val respCode = connection.getResponseCode if (respCode != HttpURLConnection.HTTP_OK) throw new MessageOnlyException(s"Got response code $respCode on $url") + connection.disconnect() true } catch { case _: TimeoutException | _: IOException => false diff --git a/project/Postgres.scala b/project/Postgres.scala index a9d8e9da0..54773ee3e 100644 --- a/project/Postgres.scala +++ b/project/Postgres.scala @@ -1,70 +1,66 @@ -import sbt._ +import java.nio.file.Path +import java.sql.DriverManager + +import scala.collection.JavaConverters._ +import scala.collection.concurrent.TrieMap +import scala.collection.mutable +import scala.util.Try -import org.testcontainers.dockerclient.DockerClientProviderStrategy -import org.testcontainers.utility.DockerImageName import org.testcontainers.containers.BindMode import org.testcontainers.containers.PostgreSQLContainer import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy - -import java.sql.DriverManager -import scala.util.Try +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy +import org.testcontainers.containers.wait.strategy.WaitAllStrategy +import org.testcontainers.dockerclient.DockerClientProviderStrategy +import org.testcontainers.utility.DockerImageName +import sbt._ object Postgres extends AutoPlugin { + private val containers: mutable.Map[Path, PostgreSQLContainer[Nothing]] = TrieMap.empty + object autoImport { - val postgresDefaultPort = settingKey[Int]("Default port of postgres") - val postgresFolder = - settingKey[File]("Folder where postgres data are stored") - val postgresDatabase = settingKey[String]("Name of the postgres database") - val startPostgres = taskKey[Int]( - "Chek that postgres has already started or else start a container" - ) + val startPostgres = taskKey[Int]("Connect to Postgres or start a Postgres container") } import autoImport._ - def settings(defaultPort: Int, database: String): Seq[Setting[_]] = Seq( - postgresDefaultPort := defaultPort, - postgresFolder := { - val c = Keys.configuration.?.value - val suffix = c.map(c => s"-${c.name}").getOrElse("") - Keys.baseDirectory.value / s".postgresql$suffix" - }, - postgresDatabase := database, - startPostgres := { + def settings(config: Configuration, defaultPort: Int, database: String): Seq[Setting[_]] = Seq( + config / startPostgres := { import sbt.util.CacheImplicits._ - val dataFolder = postgresFolder.value - val defaultPort = postgresDefaultPort.value - val database = postgresDatabase.value + val dataFolder = Keys.baseDirectory.value / s".postgresql-${config.name}" val streams = Keys.streams.value - val store = streams.cacheStoreFactory.make("last") val logger = streams.log - val tracker = util.Tracked.lastOutput[Unit, Int](store) { - case (_, None) => - checkOrStart(dataFolder, defaultPort, database, logger) - case (_, Some(previousPort)) => - checkOrStart(dataFolder, previousPort, database, logger) + + if (canConnect(defaultPort, database)) { + logger.info(s"Postgres is available on port $defaultPort") + defaultPort + } else { + // we cache the container to reuse it after a reload + val store = streams.cacheStoreFactory.make("container") + val tracker = util.Tracked.lastOutput[Unit, (String, Int)](store) { + case (_, None) => + startContainer(dataFolder, database, logger) + case (_, Some((containerId, port))) => + if (canConnect(port, database)) { + logger.info(s"Postgres container already started on port $port") + (containerId, port) + } else { + Docker.kill(containerId) + startContainer(dataFolder, database, logger) + } + } + tracker(())._2 } - tracker(()) + }, + Keys.clean := { + Keys.clean.value + val dataFolder = Keys.baseDirectory.value / s".postgresql-${config.name}" + containers.get(dataFolder.toPath).foreach(_.close()) + containers.remove(dataFolder.toPath) } ) - private def checkOrStart( - dataFolder: File, - previousPort: Int, - database: String, - logger: Logger - ): Int = - if (alreadyStarted(previousPort, database, logger)) { - logger.info(s"Postgres has already started on port $previousPort") - previousPort - } else { - logger.info("Trying to start postgres container") - val port = start(dataFolder, database) - logger.info(s"Postgres container successfully started with port $port") - port - } - - private def start(dataFolder: File, database: String): Int = { + private def startContainer(dataFolder: File, database: String, logger: Logger): (String, Int) = { if (!dataFolder.exists) IO.createDirectory(dataFolder) IO.setPermissions(dataFolder, "rwxrwxrwx") @@ -74,35 +70,49 @@ object Postgres extends AutoPlugin { val container = new PostgreSQLContainer(dockerImage) // change the wait strategy because of https://github.com/testcontainers/testcontainers-java/issues/455 - val waitStrategy = new HostPortWaitStrategy() - container.setWaitStrategy(waitStrategy) + // and https://github.com/testcontainers/testcontainers-java/issues/3372 + val hostPort = new HostPortWaitStrategy() + val logMessage = new LogMessageWaitStrategy().withRegEx(".*database system is ready to accept connections.*") + val portAndMessage = new WaitAllStrategy().withStrategy(hostPort).withStrategy(logMessage) + container.waitingFor(portAndMessage) container.withDatabaseName(database) container.withUsername("user") container.withPassword("password") container.withEnv("PGDATA", "/usr/share/postgres/data") - container.addFileSystemBind( + container.withFileSystemBind( dataFolder.toString, "/usr/share/postgres", BindMode.READ_WRITE ) - container.start() - container.getFirstMappedPort() + + val port = + try { + container.start() + container.getFirstMappedPort() + } catch { + case e: Throwable => + container.close() + throw e + } + logger.info(s"Postgres container started on port $port") + containers(dataFolder.toPath) = container + (container.getContainerId(), port) } - private def alreadyStarted( - port: Int, - database: String, - logger: Logger - ): Boolean = { + private def canConnect(port: Int, database: String): Boolean = { // `CurrentThread.setContextClassLoader[org.postgresql.Driver]` should work but it does not CurrentThread.setContextClassLoader("org.postgresql.Driver") - Try( - DriverManager.getConnection( + try { + val connection = DriverManager.getConnection( s"jdbc:postgresql://localhost:$port/$database", "user", "password" ) - ).fold(fa => { println(fa); false }, _ => true) + connection.close() + true + } catch { + case _: Throwable => false + } } }