Skip to content

Commit

Permalink
Revert "[SPARK-45502][BUILD] Upgrade Kafka to 3.6.0"
Browse files Browse the repository at this point in the history
This reverts commit d1bd21a.

### What changes were proposed in this pull request?
This pr aims to revert SPARK-45502 to make the test case `KafkaSourceStressSuite` stable.

### Why are the changes needed?
The test case `KafkaSourceStressSuite` has become very unstable after the merger of SPARK-45502, with 10 out of the recent 22 tests failing because of it. Revert it for now, and we can upgrade Kafka again after resolving the test issues.

- https://github.com/apache/spark/actions/runs/6497999347/job/17648385705
- https://github.com/apache/spark/actions/runs/6502219014/job/17660900989
- https://github.com/apache/spark/actions/runs/6502591917/job/17661861797
- https://github.com/apache/spark/actions/runs/6503144598/job/17663199041
- https://github.com/apache/spark/actions/runs/6503233514/job/17663413817
- https://github.com/apache/spark/actions/runs/6504416528/job/17666334238
- https://github.com/apache/spark/actions/runs/6509796846/job/17682130466
- https://github.com/apache/spark/actions/runs/6510877112/job/17685502094
- https://github.com/apache/spark/actions/runs/6512948316/job/17691625228
- https://github.com/apache/spark/actions/runs/6516366232/job/17699813649

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43379 from LuciferYang/Revert-SPARK-45502.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
  • Loading branch information
LuciferYang committed Oct 16, 2023
1 parent 6994bad commit 62653b9
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.io.Source
import scala.jdk.CollectionConverters._

import com.google.common.io.Files
import kafka.api.Request
import kafka.server.{HostedPartition, KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.zk.KafkaZkClient
Expand All @@ -39,7 +40,6 @@ import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.SystemTime
Expand Down Expand Up @@ -597,7 +597,7 @@ class KafkaTestUtils(
.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
FetchRequest.isValidBrokerId(partitionState.leader) &&
Request.isValidBrokerId(partitionState.leader) &&
!partitionState.replicas.isEmpty

case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.Random

import kafka.log.{LogCleaner, UnifiedLog}
import kafka.server.BrokerTopicStats
import kafka.log.{CleanerConfig, LogCleaner, LogConfig, ProducerStateManagerConfig, UnifiedLog}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark._
Expand Down Expand Up @@ -92,13 +90,13 @@ class KafkaRDDSuite extends SparkFunSuite {
val dir = new File(logDir, topic + "-" + partition)
dir.mkdirs()
val logProps = new ju.Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, java.lang.Float.valueOf(0.1f))
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f))
val logDirFailureChannel = new LogDirFailureChannel(1)
val topicPartition = new TopicPartition(topic, partition)
val producerIdExpirationMs = Int.MaxValue
val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs, false)
val logConfig = new LogConfig(logProps)
val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs)
val logConfig = LogConfig(logProps)
val log = UnifiedLog(
dir,
logConfig,
Expand All @@ -122,7 +120,7 @@ class KafkaRDDSuite extends SparkFunSuite {
log.roll()
logs.put(topicPartition, log)

val cleaner = new LogCleaner(new CleanerConfig(false), Array(dir), logs, logDirFailureChannel)
val cleaner = new LogCleaner(CleanerConfig(), Array(dir), logs, logDirFailureChannel)
cleaner.startup()
cleaner.awaitCleaned(new TopicPartition(topic, partition), log.activeSegment.baseOffset, 1000)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.{Time => KTime}
import org.apache.zookeeper.client.ZKClientConfig
Expand Down Expand Up @@ -304,7 +304,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
val leader = partitionState.leader
val isr = partitionState.isr
zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
FetchRequest.isValidBrokerId(leader) && !isr.isEmpty
Request.isValidBrokerId(leader) && !isr.isEmpty
case _ =>
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.streaming.kafka010.mocks

import java.util.concurrent.{ScheduledFuture, TimeUnit}

import kafka.utils.Scheduler
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.util.Scheduler
import org.jmock.lib.concurrent.DeterministicScheduler

/**
Expand All @@ -42,6 +42,8 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler {

val scheduler = new DeterministicScheduler()

def isStarted: Boolean = true

def startup(): Unit = {}

def shutdown(): Unit = synchronized {
Expand All @@ -54,18 +56,17 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler {

def schedule(
name: String,
task: Runnable,
delayMs: Long = 0,
periodMs: Long = -1): ScheduledFuture[_] = synchronized {
if (periodMs >= 0) {
scheduler.scheduleAtFixedRate(task, delayMs, periodMs, TimeUnit.MILLISECONDS)
fun: () => Unit,
delay: Long = 0,
period: Long = -1,
unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[_] = synchronized {
val runnable = new Runnable {
override def run(): Unit = fun()
}
if (period >= 0) {
scheduler.scheduleAtFixedRate(runnable, delay, period, unit)
} else {
scheduler.schedule(task, delayMs, TimeUnit.MILLISECONDS)
scheduler.schedule(runnable, delay, unit)
}
}

override def resizeThreadPool(i: Int): Unit = {

}

}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
<!-- Version used for internal directory structure -->
<hive.version.short>2.3</hive.version.short>
<!-- note that this should be compatible with Kafka brokers version 0.10 and up -->
<kafka.version>3.6.0</kafka.version>
<kafka.version>3.4.1</kafka.version>
<!-- After 10.15.1.3, the minimum required version is JDK9 -->
<derby.version>10.14.2.0</derby.version>
<parquet.version>1.13.1</parquet.version>
Expand Down

0 comments on commit 62653b9

Please sign in to comment.