From 280aa3ceeaacb64dc20d03c2edbd8c5b72035d2f Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Fri, 1 Mar 2024 17:43:35 +0100 Subject: [PATCH 1/2] Update to latest scio --- build.sbt | 33 +++--- project/giter8.sbt | 3 - project/plugins.sbt | 3 +- src/main/g8/build.sbt | 202 +++++++++++++------------------- src/main/g8/project/plugins.sbt | 3 +- src/test/g8/test | 3 +- 6 files changed, 98 insertions(+), 149 deletions(-) delete mode 100644 project/giter8.sbt diff --git a/build.sbt b/build.sbt index 1c85589..5e4dfd7 100644 --- a/build.sbt +++ b/build.sbt @@ -1,30 +1,27 @@ -val scioVersion = "0.12.7" -val beamVersion = "2.38.0" -val flinkVersion = "1.13.6" -val sparkVersion = "3.1.2" +// see https://github.com/spotify/scio/blob/v0.14.2/build.sbt +val scioVersion = "0.14.2" +val beamVersion = "2.54.0" +val slf4jVersion = "1.7.30" +val flinkVersion = "1.16.0" +val sparkVersion = "3.5.0" lazy val root = project .in(file(".")) - .enablePlugins(ScriptedPlugin) .settings( name := "scio.g8", Test / test := { val _ = (Test / g8Test).toTask("").value }, + Test / g8 / g8Properties ++= Map( + "DataflowRunner" -> "yes", + "FlinkRunner" -> "yes", + "SparkRunner" -> "yes", + "DataflowFlexTemplate" -> "yes" + ), scriptedLaunchOpts ++= List( - "-Xms1024m", - "-Xmx1024m", - "-XX:ReservedCodeCacheSize=128m", - "-XX:MaxPermSize=256m", - "-Xss2m", + "-Xms1G", + "-Xmx4G", "-Dfile.encoding=UTF-8" ), - // Get scala-steward to update template dependencies - libraryDependencies ++= Seq( - "com.spotify" %% "scio-core" % scioVersion, - "org.apache.beam" % "beam-runners-direct-java" % beamVersion, - "org.apache.flink" %% "flink-runtime" % flinkVersion, - "org.apache.spark" %% "spark-core" % sparkVersion, - "org.slf4j" % "slf4j-simple" % "1.7.36" - ) + scriptedBufferLog := false, ) diff --git a/project/giter8.sbt b/project/giter8.sbt deleted file mode 100644 index 921e7b6..0000000 --- a/project/giter8.sbt +++ /dev/null @@ -1,3 +0,0 @@ -addSbtPlugin("org.foundweekends.giter8" %% "sbt-giter8" % "0.13.1") - -libraryDependencies += "org.scala-sbt" %% "scripted-plugin" % sbtVersion.value diff --git a/project/plugins.sbt b/project/plugins.sbt index 428f63f..b9c4fe6 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,2 +1 @@ -addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.9") -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0") +addSbtPlugin("org.foundweekends.giter8" %% "sbt-giter8" % "0.16.2") diff --git a/src/main/g8/build.sbt b/src/main/g8/build.sbt index e1b3cc3..4060e9f 100644 --- a/src/main/g8/build.sbt +++ b/src/main/g8/build.sbt @@ -6,86 +6,111 @@ import scala.sys.process._ import complete.DefaultParsers._ $endif$ -val scioVersion = "0.12.7" -val beamVersion = "2.38.0" +// see https://github.com/spotify/scio/blob/v0.14.2/build.sbt +val scioVersion = "0.14.2" +val beamVersion = "2.54.0" +val slf4jVersion = "1.7.30" $if(FlinkRunner.truthy)$ -val flinkVersion = "1.13.6" +val flinkVersion = "1.16.0" $endif$ $if(SparkRunner.truthy)$ -val sparkVersion = "3.1.2" +val sparkVersion = "3.5.0" +$endif$ + +$if(DataflowFlexTemplate.truthy)$ +lazy val gcpProject = settingKey[String]("GCP Project") +lazy val gcpRegion = settingKey[String]("GCP region") +lazy val gcpDataflowFlexPath = settingKey[String]("GCS path to dataflow flext template") +lazy val gcpDataflowFlexTemplateBuiild = inputKey[Unit]("create dataflow flex-template") +lazy val gcpDataflowFlexTemplateRun = inputKey[Unit]("run dataflow flex-template") $endif$ lazy val commonSettings = Def.settings( organization := "$organization$", // Semantic versioning http://semver.org/ version := "0.1.0-SNAPSHOT", - $if(FlinkRunner.truthy || SparkRunner.truthy)$ - // scala-steward:off - scalaVersion := "2.12.13", - // scala-steward:on - $else$ - scalaVersion := "2.13.3", - $endif$ - $if(DataflowRunner.truthy)$ - resolvers += "confluent" at "https://packages.confluent.io/maven/", - $endif$ - scalacOptions ++= Seq("-target:jvm-1.8", - "-deprecation", - "-feature", - "-unchecked", - $if(!FlinkRunner.truthy && !SparkRunner.truthy)$ - "-Ymacro-annotations", - $endif$ - ), - javacOptions ++= Seq("-source", "1.8", "-target", "1.8") + scalaVersion := "2.13.13", + scalacOptions ++= Seq( + "-release", "8", + "-deprecation", + "-feature", + "-unchecked", + "-Ymacro-annotations" + ), + javacOptions ++= Seq("--release", "8"), + // add extra resolved and remove exclude if you need kafka + // resolvers += "confluent" at "https://packages.confluent.io/maven/", + excludeDependencies += "org.apache.beam" % "beam-sdks-java-io-kafka" ) lazy val root: Project = project .in(file(".")) - .settings(commonSettings) $if(DataflowFlexTemplate.truthy)$ - .settings(assemblySettings) + .enablePlugins(JavaAppPackaging, DockerPlugin) $endif$ + .settings(commonSettings) .settings( name := "$name;format="lower,hyphen"$", description := "$name$", publish / skip := true, - run / classLoaderLayeringStrategy := ClassLoaderLayeringStrategy.Flat, - run / fork := true, - $if(FlinkRunner.truthy || SparkRunner.truthy)$ - addCompilerPlugin("org.scalamacros" % "paradise_2.12.13" % "2.1.1"), - $endif$ + fork := true, libraryDependencies ++= Seq( "com.spotify" %% "scio-core" % scioVersion, - "com.spotify" %% "scio-test" % scioVersion % Test, - "org.apache.beam" % "beam-runners-direct-java" % beamVersion, + "org.slf4j" % "slf4j-api" % slf4jVersion, $if(DataflowRunner.truthy || DataflowFlexTemplate.truthy)$ - "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion, + "org.apache.beam" % "beam-runners-google-cloud-dataflow-java" % beamVersion % Runtime, $endif$ $if(FlinkRunner.truthy)$ - "org.apache.beam" % "beam-runners-flink-1.13" % beamVersion excludeAll ( - ExclusionRule("com.twitter", "chill_2.11"), - ExclusionRule("org.apache.flink", "flink-clients_2.11"), - ExclusionRule("org.apache.flink", "flink-runtime_2.11"), - ExclusionRule("org.apache.flink", "flink-streaming-java_2.11"), - ExclusionRule("org.apache.flink", "flink-optimizer_2.11") - ), - "org.apache.flink" %% "flink-clients" % flinkVersion, - "org.apache.flink" %% "flink-runtime" % flinkVersion, - "org.apache.flink" %% "flink-streaming-java" % flinkVersion, - "org.apache.flink" %% "flink-optimizer" % flinkVersion, + "org.apache.beam" % "beam-runners-flink-1.16" % beamVersion % Runtime, + "org.apache.flink" % "flink-clients" % flinkVersion % Runtime, + "org.apache.flink" % "flink-streaming-java" % flinkVersion % Runtime, $endif$ $if(SparkRunner.truthy)$ - "org.apache.beam" % "beam-runners-spark" % beamVersion exclude ( - "com.fasterxml.jackson.module", "jackson-module-scala_2.11" - ), - "org.apache.spark" %% "spark-core" % sparkVersion, - "org.apache.spark" %% "spark-streaming" % sparkVersion, + "org.apache.beam" % "beam-runners-spark-3" % beamVersion % Runtime, + "org.apache.spark" %% "spark-core" % sparkVersion % Runtime, + "org.apache.spark" %% "spark-streaming" % sparkVersion % Runtime, $endif$ - "org.slf4j" % "slf4j-simple" % "1.7.36" - ) + "com.spotify" %% "scio-test" % scioVersion % Test, + "org.slf4j" % "slf4j-simple" % slf4jVersion % Test + ), + $if(DataflowFlexTemplate.truthy)$ + Docker / packageName := s"gcr.io/\${gcpProject.value}/dataflow/templates/\${name.value}", + dockerBaseImage := "gcr.io/dataflow-templates-base/java11-template-launcher-base:latest", + dockerEntrypoint := Seq("/opt/google/dataflow/java_template_launcher"), + dockerCommands ++= Seq( + Cmd( + "ENV", + "FLEX_TEMPLATE_JAVA_MAIN_CLASS", + (Compile / mainClass).value.get + ), + Cmd( + "ENV", + "FLEX_TEMPLATE_JAVA_CLASSPATH", + (Universal / mappings).value.collect {case (_, dest) if dest.startsWith("lib/") => dest }.mkString(":") + ) + ), + gcpProject := "", + gcpRegion := "", + gcpDataflowFlexPath := "", + gcpDataflowFlexTemplateBuiild := { + (Docker / publish).value + s"""gcloud dataflow flex-template build \${gcpDataflowFlexPath.value}/templates/\${name.value}.json + |--image \${dockerAlias.value} + |--sdk-language JAVA + |--metadata-file metadata.json""".stripMargin ! + }, + gcpDataflowFlexTemplateRun := { + val parameters = spaceDelimited("").parsed + s"""gcloud dataflow flex-template run \${name.value} + |--project=\${gcpProject.value} + |--region=\${gcpRegion.value} + |--temp-location=\${gcpDataflowFlexPath.value}/temp + |--staging-location=\${gcpDataflowFlexPath.value}/staging + |--template-file-gcs-location \${gcpDataflowFlexPath.value}/templates/\${name.value}.json + |--parameters \${parameters.mkString(",")}""".stripMargin ! + } + $endif$ ) - .enablePlugins(JavaAppPackaging) lazy val repl: Project = project .in(file(".repl")) @@ -97,76 +122,7 @@ lazy val repl: Project = project "com.spotify" %% "scio-repl" % scioVersion ), Compile / mainClass := Some("com.spotify.scio.repl.ScioShell"), - publish / skip := true + publish / skip := true, + fork := false, ) .dependsOn(root) - -$if(DataflowFlexTemplate.truthy)$ -lazy val gcpProject = settingKey[String]("GCP Project") -lazy val gcpRegion = settingKey[String]("GCP region") -lazy val createFlextTemplate = inputKey[Unit]("create DataflowFlexTemplate") -lazy val runFlextTemplate = inputKey[Unit]("run DataflowFlexTemplate") - -lazy val assemblySettings = Def.settings( - gcpProject := "", - gcpRegion := "", - assembly / test := {}, - assembly / assemblyJarName := "flex-wordcount.jar", - assembly / assemblyMergeStrategy ~= { old => - { - case s if s.endsWith(".properties") => MergeStrategy.filterDistinctLines - case s if s.endsWith("public-suffix-list.txt") => - MergeStrategy.filterDistinctLines - case s if s.endsWith(".class") => MergeStrategy.last - case s if s.endsWith(".proto") => MergeStrategy.last - case s if s.endsWith("reflection-config.json") => MergeStrategy.rename - case s => old(s) - } - }, - Universal / mappings := { - val fatJar = (Compile / assembly).value - val filtered = (Universal / mappings).value.filter { - case (_, name) => !name.endsWith(".jar") - } - filtered :+ (fatJar -> (s"lib/\${fatJar.getName}")) - }, - scriptClasspath := Seq((assembly / assemblyJarName).value), - Docker / packageName := s"gcr.io/\${gcpProject.value}/dataflow/templates/DataflowFlexTemplate", - Docker / dockerCommands := Seq( - Cmd( - "FROM", - "gcr.io/dataflow-templates-base/java11-template-launcher-base:latest" - ), - Cmd( - "ENV", - "FLEX_TEMPLATE_JAVA_MAIN_CLASS", - (assembly / mainClass).value.getOrElse("") - ), - Cmd( - "ENV", - "FLEX_TEMPLATE_JAVA_CLASSPATH", - s"/template/\${(assembly / assemblyJarName).value}" - ), - ExecCmd( - "COPY", - s"1/opt/docker/lib/\${(assembly / assemblyJarName).value}", - "\${FLEX_TEMPLATE_JAVA_CLASSPATH}" - ) - ), - createFlextTemplate := { - val _ = (Docker / publish).value - s"""gcloud beta dataflow DataflowFlexTemplate build - gs://\${gcpProject.value}/dataflow/templates/\${name.value}.json - --image \${dockerAlias.value} - --sdk-language JAVA - --metadata-file metadata.json""" ! - }, - runFlextTemplate := { - val parameters = spaceDelimited("").parsed - s"""gcloud beta dataflow DataflowFlexTemplate run \${name.value} - --template-file-gcs-location gs://\${gcpProject.value}/dataflow/templates/\${name.value}.json - --region=\${gcpRegion.value} - --parameters \${parameters.mkString(",")}""" ! - } -) -$endif$ diff --git a/src/main/g8/project/plugins.sbt b/src/main/g8/project/plugins.sbt index 199c51a..703522f 100644 --- a/src/main/g8/project/plugins.sbt +++ b/src/main/g8/project/plugins.sbt @@ -1,4 +1,3 @@ -addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.9") $if(DataflowFlexTemplate.truthy) $ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.2.0") +addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.16") $endif$ diff --git a/src/test/g8/test b/src/test/g8/test index a270b7b..4fdb6db 100644 --- a/src/test/g8/test +++ b/src/test/g8/test @@ -1 +1,2 @@ -> test \ No newline at end of file +> test +> Docker / stage From aeb7056a5612e1d5a2b5531f814a1e7cdb053485 Mon Sep 17 00:00:00 2001 From: Michel Davit Date: Mon, 4 Mar 2024 13:36:00 +0100 Subject: [PATCH 2/2] Fix dataflow flex template --- src/main/g8/build.sbt | 58 ++++++++++++------- .../$organization__packaged$/WordCount.scala | 9 ++- 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/src/main/g8/build.sbt b/src/main/g8/build.sbt index 4060e9f..23ca575 100644 --- a/src/main/g8/build.sbt +++ b/src/main/g8/build.sbt @@ -20,8 +20,8 @@ $endif$ $if(DataflowFlexTemplate.truthy)$ lazy val gcpProject = settingKey[String]("GCP Project") lazy val gcpRegion = settingKey[String]("GCP region") -lazy val gcpDataflowFlexPath = settingKey[String]("GCS path to dataflow flext template") -lazy val gcpDataflowFlexTemplateBuiild = inputKey[Unit]("create dataflow flex-template") +lazy val gcpDataflowFlexBucket = settingKey[String]("GCS bucket for the flext template") +lazy val gcpDataflowFlexTemplateBuild = inputKey[Unit]("create dataflow flex-template") lazy val gcpDataflowFlexTemplateRun = inputKey[Unit]("run dataflow flex-template") $endif$ @@ -75,26 +75,40 @@ lazy val root: Project = project ), $if(DataflowFlexTemplate.truthy)$ Docker / packageName := s"gcr.io/\${gcpProject.value}/dataflow/templates/\${name.value}", + Docker / daemonUserUid := None, + Docker / daemonUser := "root", + dockerPermissionStrategy := DockerPermissionStrategy.None, dockerBaseImage := "gcr.io/dataflow-templates-base/java11-template-launcher-base:latest", - dockerEntrypoint := Seq("/opt/google/dataflow/java_template_launcher"), - dockerCommands ++= Seq( - Cmd( - "ENV", - "FLEX_TEMPLATE_JAVA_MAIN_CLASS", - (Compile / mainClass).value.get - ), - Cmd( - "ENV", - "FLEX_TEMPLATE_JAVA_CLASSPATH", - (Universal / mappings).value.collect {case (_, dest) if dest.startsWith("lib/") => dest }.mkString(":") + dockerCommands := { + // keep default from base image + val filteredCommands = dockerCommands.value.filterNot { + case Cmd("USER", _*) => true + case Cmd("WORKDIR", _*) => true + case ExecCmd("ENTRYPOINT", _*) => true + case ExecCmd("CMD", _*) => true + case _ => false + } + // add required ENV commands + val envCommands = Seq( + Cmd( + "ENV", + "FLEX_TEMPLATE_JAVA_MAIN_CLASS", + (Compile / mainClass).value.get + ), + Cmd( + "ENV", + "FLEX_TEMPLATE_JAVA_CLASSPATH", + (Docker / defaultLinuxInstallLocation).value + "/lib/*" + ) ) - ), - gcpProject := "", - gcpRegion := "", - gcpDataflowFlexPath := "", - gcpDataflowFlexTemplateBuiild := { + filteredCommands ++ envCommands + }, + gcpProject := "", // TODO + gcpRegion := "", // TODO + gcpDataflowFlexBucket := "", // TODO + gcpDataflowFlexTemplateBuild := { (Docker / publish).value - s"""gcloud dataflow flex-template build \${gcpDataflowFlexPath.value}/templates/\${name.value}.json + s"""gcloud dataflow flex-template build gs://\${gcpDataflowFlexBucket.value}/dataflow/templates/\${name.value}.json |--image \${dockerAlias.value} |--sdk-language JAVA |--metadata-file metadata.json""".stripMargin ! @@ -104,9 +118,9 @@ lazy val root: Project = project s"""gcloud dataflow flex-template run \${name.value} |--project=\${gcpProject.value} |--region=\${gcpRegion.value} - |--temp-location=\${gcpDataflowFlexPath.value}/temp - |--staging-location=\${gcpDataflowFlexPath.value}/staging - |--template-file-gcs-location \${gcpDataflowFlexPath.value}/templates/\${name.value}.json + |--temp-location=gs://\${gcpDataflowFlexBucket.value}/dataflow/temp + |--staging-location=gs://\${gcpDataflowFlexBucket.value}/dataflow/staging + |--template-file-gcs-location=gs://\${gcpDataflowFlexBucket.value}/dataflow/templates/\${name.value}.json |--parameters \${parameters.mkString(",")}""".stripMargin ! } $endif$ diff --git a/src/main/g8/src/main/scala/$organization__packaged$/WordCount.scala b/src/main/g8/src/main/scala/$organization__packaged$/WordCount.scala index b625c65..b6b25c9 100644 --- a/src/main/g8/src/main/scala/$organization__packaged$/WordCount.scala +++ b/src/main/g8/src/main/scala/$organization__packaged$/WordCount.scala @@ -4,7 +4,9 @@ import com.spotify.scio._ /* sbt "runMain [PACKAGE].WordCount - --project=[PROJECT] --runner=DataflowRunner --zone=[ZONE] + --project=[PROJECT] + --region=[REGION] + --runner=[RUNNER] --input=gs://dataflow-samples/shakespeare/kinglear.txt --output=gs://[BUCKET]/[PATH]/wordcount" */ @@ -24,6 +26,11 @@ object WordCount { .map(t => t._1 + ": " + t._2) .saveAsTextFile(output) + + $if(DataflowFlexTemplate.truthy)$ + sc.run() + $else$ val result = sc.run().waitUntilFinish() + $endif$ } }