Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump spark from 3.3.1 to 3.5.1 in /spark-connector #144

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions spark-connector/common/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,33 @@
<parent>
<artifactId>spark-connector</artifactId>
<groupId>com.aliyun.odps</groupId>
<version>3.3.1-odps0.43.0</version>
<version>3.5.1-odps0.45.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-odps-common</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.3.1</version>
<version>3.5.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.3.1</version>
<version>3.5.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>3.3.1</version>
<version>3.5.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_2.12</artifactId>
<version>3.3.1</version>
<version>3.5.1</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand Down
2 changes: 1 addition & 1 deletion spark-connector/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>spark-connector</artifactId>
<groupId>com.aliyun.odps</groupId>
<version>3.3.1-odps0.43.0</version>
<version>3.5.1-odps0.45.5</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.odps.vectorized._
import org.apache.spark.sql.types._
Expand All @@ -59,7 +60,7 @@ case class OdpsPartitionReaderFactory(broadcastedConf: Broadcast[SerializableCon
asyncRead: Boolean)
extends PartitionReaderFactory with Logging {

private val output = readDataSchema.toAttributes ++ readPartitionSchema.toAttributes
private val output = DataTypeUtils.toAttributes(readDataSchema) ++ DataTypeUtils.toAttributes(readPartitionSchema)
private val allNames = output.map(_.name)
private val allTypes = output.map(_.dataType)
private val arrowDataFormat = new DataFormat(DataFormat.Type.ARROW, DataFormat.Version.V5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning, RoundRobinPartitioning, SinglePartition}
import org.apache.spark.sql.execution.{PartitionIdPassthrough, RecordBinaryComparator, SQLExecution, ShufflePartitionSpec, ShuffledRowRDD, SparkPlan, UnsafeExternalRowSorter, UnsafeRowSerializer}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.execution.{RecordBinaryComparator, SQLExecution, ShufflePartitionSpec, ShuffledRowRDD, SparkPlan, UnsafeExternalRowSorter, UnsafeRowSerializer}
import org.apache.spark.sql.execution.exchange.{REPARTITION_BY_NUM, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -128,6 +129,16 @@ case class OdpsShuffleExchangeExec(
override protected def withNewChildInternal(newChild: SparkPlan): OdpsShuffleExchangeExec = {
copy(child = newChild)
}

override def advisoryPartitionSize: Option[Long] = {
val dataSize = metrics("dataSize").value
val numPartitions = metrics("numPartitions").value
if (dataSize > 0 && numPartitions > 0) {
Some(dataSize / numPartitions)
} else {
Some(64 * 1024 * 1024L) // 64MB
}
}
}

object ShuffleExchangeExec {
Expand Down Expand Up @@ -303,7 +314,7 @@ object ShuffleExchangeExec {
val pageSize = SparkEnv.get.memoryManager.pageSizeBytes

val sorter = UnsafeExternalRowSorter.createWithRecordComparator(
StructType.fromAttributes(outputAttributes),
DataTypeUtils.fromAttributes(outputAttributes),
recordComparatorSupplier,
prefixComparator,
prefixComputer,
Expand Down
2 changes: 1 addition & 1 deletion spark-connector/datasource/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>spark-connector</artifactId>
<groupId>com.aliyun.odps</groupId>
<version>3.3.1-odps0.43.0</version>
<version>3.5.1-odps0.45.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-odps-datasource</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions spark-connector/datasource/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>spark-connector</artifactId>
<groupId>com.aliyun.odps</groupId>
<version>3.3.1-odps0.43.0</version>
<version>3.5.1-odps0.45.5</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
Expand All @@ -16,7 +16,7 @@
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>spark-odps-common</artifactId>
<version>3.3.1-odps0.43.0</version>
<version>3.5.1-odps0.45.5</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.AttributeSet
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
Expand Down Expand Up @@ -199,8 +201,8 @@ case class OdpsWriteBuilder(
assert(queryId != null, "Missing query ID")

SchemaUtils.checkColumnNameDuplication(schema.fields.map(_.name),
s"when inserting into $tableIdent", caseSensitiveAnalysis)
DataSource.validateSchema(schema)
caseSensitiveAnalysis)
DataSource.validateSchema(schema, SQLConf.get)
}

private def createWriteJobDescription(sparkSession: SparkSession,
Expand All @@ -210,7 +212,7 @@ case class OdpsWriteBuilder(
options: Map[String, String],
odpsOptions: OdpsOptions,
supportArrowWriter: Boolean): WriteJobDescription = {
val outputColumns = schema.toAttributes
val outputColumns = DataTypeUtils.toAttributes(schema)
val outputPartitionColumns =
outputColumns.filter(c => partitionSchema.getFieldIndex(c.name).isDefined)
val outputPartitionSet = AttributeSet(outputPartitionColumns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class OdpsExtensions extends (SparkSessionExtensions => Unit) {
}
ShowColumnsCommand(table)

case i@InsertIntoStatement(r@DataSourceV2Relation(table: OdpsTable, _, _, _, _), _, _, _, _, _)
case i@InsertIntoStatement(r@DataSourceV2Relation(table: OdpsTable, _, _, _, _), _, _, _, _, _, /* byName */false)
if i.query.resolved =>
if (i.partitionSpec.nonEmpty && !r.options.containsKey(WRITE_ODPS_STATIC_PARTITION)) {
val normalizedSpec = PartitioningUtils.normalizePartitionSpec(
Expand Down Expand Up @@ -125,7 +125,7 @@ class OdpsExtensions extends (SparkSessionExtensions => Unit) {
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.transform {
case AppendData(
r @ DataSourceV2Relation(table: OdpsTable, _ , _, _, options), query, writeOptions, isByName, write)
r @ DataSourceV2Relation(table: OdpsTable, _ , _, _, options), query, writeOptions, isByName, write, /* analyzedQuery */ null)
if !writeOptions.contains(WRITE_ODPS_TABLE_RESOLVED) =>
val newQuery = insertRepartition(query, table)
var newOptions = writeOptions + Tuple2(WRITE_ODPS_TABLE_RESOLVED, "true")
Expand Down
2 changes: 1 addition & 1 deletion spark-connector/hive/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<artifactId>spark-connector</artifactId>
<groupId>com.aliyun.odps</groupId>
<version>3.3.1-odps0.43.0</version>
<version>3.5.1-odps0.45.5</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>spark-odps-hive_2.12</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions spark-connector/hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>spark-connector</artifactId>
<groupId>com.aliyun.odps</groupId>
<version>3.3.1-odps0.43.0</version>
<version>3.5.1-odps0.45.5</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand All @@ -32,7 +32,7 @@
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>spark-odps-common</artifactId>
<version>3.3.1-odps0.43.0</version>
<version>3.5.1-odps0.45.5</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.odps.OdpsClient

import java.util.Locale

private[sql] class HiveSessionCatalog(
externalCatalogBuilder: () => ExternalCatalog,
globalTempViewManagerBuilder: () => GlobalTempViewManager,
Expand All @@ -48,4 +50,8 @@ private[sql] class HiveSessionCatalog(
// TODO: Load defaults in cluster mode
.getOrCreate()
.odps().getDefaultProject)

private def formatDatabaseName(name: String): String = {
if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class HiveSessionStateBuilder(
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
DetectAmbiguousSelfJoin +:
new DetermineTableStats(session) +:
PreprocessTableCreation(session) +:
PreprocessTableCreation(catalog) +:
PreprocessTableInsertion +:
DataSourceAnalysis +:
HiveAnalysis +:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {

// handles InsertIntoStatement specially as the table in InsertIntoStatement is not added in its
// children, hence not matched directly by previous HiveTableRelation case.
case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _)
case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, false)
if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
i.copy(table = hiveTableWithStats(relation))
}
Expand All @@ -94,7 +94,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
object HiveAnalysis extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case InsertIntoStatement(
r: HiveTableRelation, partSpec, _, query, overwrite, ifPartitionNotExists)
r: HiveTableRelation, partSpec, _, query, overwrite, ifPartitionNotExists, false)
if DDLUtils.isHiveTable(r.tableMeta) =>
InsertIntoOdpsTable(r.tableMeta, partSpec, query, overwrite,
ifPartitionNotExists, query.output.map(_.name))
Expand All @@ -120,7 +120,7 @@ private[hive] trait HiveStrategies {
*/
object HiveTableScans extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ScanOperation(projects, filters, relation: HiveTableRelation) =>
case ScanOperation(projects, filters, null, relation: HiveTableRelation) =>
// Filters on this relation fall into four categories based
// on where we can use them to avoid reading unneeded data:
// - partition keys only - used to prune directories to read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ private[hive] class HiveClientImpl(
if (!getDatabase(database.name).locationUri.equals(database.locationUri)) {
// SPARK-29260: Enable supported versions once it support altering database location.
if (!(version.equals(hive.v3_0) || version.equals(hive.v3_1))) {
throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError(version.fullVersion)
// version.fullVersion
throw QueryCompilationErrors.alterDatabaseLocationUnsupportedError()
}
}
val hiveDb = toHiveDatabase(database)
Expand Down Expand Up @@ -516,7 +517,7 @@ private[hive] class HiveClientImpl(
case HiveTableType.VIRTUAL_VIEW => CatalogTableType.VIEW
case unsupportedType =>
val tableTypeStr = unsupportedType.toString.toLowerCase(Locale.ROOT).replace("_", " ")
throw QueryCompilationErrors.hiveTableTypeUnsupportedError(tableTypeStr)
throw QueryCompilationErrors.hiveTableTypeUnsupportedError(h.getTableName, tableTypeStr)
},
schema = schema,
partitionColumnNames = partCols.map(_.name).toSeq,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,22 @@ package org.apache.spark.sql.hive.client
import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
import java.lang.reflect.{InvocationTargetException, Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Locale, Map => JMap, Set => JSet}
import java.util.{Locale, ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.IMetaStoreClient
import org.apache.hadoop.hive.metastore.TableType
import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Function => HiveFunction, FunctionType, Index, MetaException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, FunctionType, Index, MetaException, PrincipalType, ResourceType, ResourceUri, Function => HiveFunction}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.io.AcidUtils
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants

import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
Expand Down Expand Up @@ -982,7 +979,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
def unapply(expr: Expression): Option[Attribute] = {
expr match {
case attr: Attribute => Some(attr)
case Cast(child @ IntegralType(), dt: IntegralType, _, _)
case Cast(child, dt: IntegralType, _, _)
if Cast.canUpCast(child.dataType.asInstanceOf[AtomicType], dt) => unapply(child)
case _ => None
}
Expand Down Expand Up @@ -1146,8 +1143,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
// client-side filtering cannot be used with TimeZoneAwareExpression.
def hasTimeZoneAwareExpression(e: Expression): Boolean = {
e.exists {
case cast: CastBase => cast.needsTimeZone
case tz: TimeZoneAwareExpression => !tz.isInstanceOf[CastBase]
case cast: Cast => cast.needsTimeZone
case tz: TimeZoneAwareExpression => !tz.isInstanceOf[Cast]
case _ => false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@ import java.io.File
import java.lang.reflect.InvocationTargetException
import java.net.{URL, URLClassLoader}
import java.util

import scala.util.Try

import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.shims.ShimLoader

import org.apache.hadoop.util.VersionInfo
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
Expand All @@ -42,6 +40,9 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils, VersionUtils}

/** Factory for `IsolatedClientLoader` with specific versions of hive. */
private[hive] object IsolatedClientLoader extends Logging {

def isHadoop3: Boolean = VersionUtils.majorVersion(VersionInfo.getVersion) == 3

/**
* Creates isolated Hive client loaders by downloading the requested version from maven.
*/
Expand All @@ -68,7 +69,7 @@ private[hive] object IsolatedClientLoader extends Logging {
case e: RuntimeException if e.getMessage.contains("hadoop") =>
// If the error message contains hadoop, it is probably because the hadoop
// version cannot be resolved.
val fallbackVersion = if (VersionUtils.isHadoop3) {
val fallbackVersion = if (isHadoop3) {
"3.3.2"
} else {
"2.7.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package org.apache.spark.sql.hive.execution

import java.util.Locale

import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.parser.{ParseException, SqlBaseParser}
import org.apache.spark.sql.catalyst.parser.ParserUtils._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.execution.{SparkSqlAstBuilder, SparkSqlParser}
Expand All @@ -41,11 +40,15 @@ class OdpsSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder() {
// }.getOrElse(TruncateTable(table))
// }

def getId(ctx: PrimitiveDataTypeContext): IdentifierContext = {
ctx.getRuleContext(classOf[SqlBaseParser.IdentifierContext], 0)
}

/**
* Resolve/create a primitive type.
*/
override def visitPrimitiveDataType(ctx: PrimitiveDataTypeContext): DataType = withOrigin(ctx) {
val dataType = ctx.identifier.getText.toLowerCase(Locale.ROOT)
val dataType = getId(ctx).getText.toLowerCase(Locale.ROOT)
(dataType, ctx.INTEGER_VALUE().asScala.toList) match {
case ("boolean", Nil) => BooleanType
case ("tinyint" | "byte", Nil) => ByteType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ object OdpsTableWriter extends Logging {
logError(s"Data source write $identifier aborted.")
cause match {
// Only wrap non fatal exceptions.
case NonFatal(e) => throw QueryExecutionErrors.writingJobAbortedError(e)
case NonFatal(e) => throw QueryExecutionErrors.writingJobFailedError(e)
case _ => throw cause
}
}
Expand Down
Loading