-
Notifications
You must be signed in to change notification settings - Fork 439
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-8060][CORE] GlutenShuffleManager as a registry of shuffle managers #8084
base: main
Are you sure you want to change the base?
Conversation
Run Gluten Clickhouse CI on x86 |
Run Gluten Clickhouse CI on x86 |
gluten-core/src/main/scala/org/apache/spark/shuffle/ShuffleManagerRegistry.scala
Show resolved
Hide resolved
/** The internal shuffle manager instance used by GlutenShuffleManager. */ | ||
private class ShuffleManagerRouter(lookup: ShuffleManagerLookup) extends ShuffleManager { | ||
import ShuffleManagerRouter._ | ||
private val cache = new Cache() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this status be sensed by spark executor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so. The cache key is shuffle ID which will be shared among driver and executors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the mapping saved in cache.store
also be synchronized to the executor? I am worried that calling cache.get(shuffleId)
on the executor will not be able to find the ShuffleManager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. It's likely a problem. Let me test, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a simple solution could be registering ShuffleHandle -> ShuffleManager
mappings as well which can be used for lookup on executor side. Would you think that is feasible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wForget I assume the current code is fine, given that usually shuffle dependency builder (namely #getDependencies) is carried by shuffle RDD and called from task code. Will run some tests later to prove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a simple solution could be registering
ShuffleHandle -> ShuffleManager
mappings as well which can be used for lookup on executor side. Would you think that is feasible?
Sounds feasible
@zhztheplayer Although spark.shuffle.manager can be uniformly configured as GlutenShuffleManager, additional configurations are still required to choose between ColumnarShuffleManager, CelebornShuffleManager, or UniffleShuffleManager. Essentially, this is no different from the current setup where spark.shuffle.manager is configured to different managers. Moreover, from a certain perspective, the introduction of GlutenShuffleManager couples the various ShuffleManagers together, whereas the current fully decoupled setup seems clearer. |
mapId: Long, | ||
context: TaskContext, | ||
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] = { | ||
cache.get(handle.shuffleId).getWriter(handle, mapId, context, metrics) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be better to determine the appropriate ShuffleManager based on ShuffleHandle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShuffleHandle
is not passing to ShuffleBlockResolver
so it could be tricky if using it as key. Would you expect any potential issues if we use shuffle ID as the key? Maybe we need another solution to avoid them.
Run Gluten Clickhouse CI on x86 |
I haven't clearly gotten what's the new code coupling come from. Say ideally we can remove the proxied calls from RSS shuffle managers to default shuffle manager because of this work (each shuffle manager can be registered for certain shuffle dependency type it can handle). So I see this a decoupling of code rather than coupling of code.
But I may understand your concern totally especially this one. The reason I am proposing this is not simply because of code refactor. The solution's kind of a trade-off for letting user enable different backends at the same time. See the umbrella #6920. For that purpose, we have to develop a way to make the shuffle managers be registered together so Gluten could choose the right one to use. For example, if we introduce GPU backend, we should give back the flexibility to developers when they decide not to let GPU backend rely on Velox backend, but they may still want query planner use Velox shuffle when the query plan is not offloaded to GPU. May be we can put off deprecating the current use of |
No, it's not intentionally a refactor against current shuffles. It's for preparing the base support of mixed backend(s): #6920 . |
@zhztheplayer Thank you very much for your detailed explanation. I believe that #6920 indeed requires this feature. At the same time, retaining the current option that allows users to specify the ShuffleManager through configuration is also necessary. I think this is acceptable. |
class ShuffleManagerRegistry private[ShuffleManagerRegistry] { | ||
import ShuffleManagerRegistry._ | ||
private val all: mutable.Buffer[(LookupKey, String)] = mutable.Buffer() | ||
private val routerBuilders: mutable.Buffer[RouterBuilder] = mutable.Buffer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain in what cases there would be multiple RouterBuilders? Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we usually have one single router builder at the same time. So this practice can be simplified in future with some kind of assertions (something like if (router created) throw error
). Though it's benign at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason we don't simply adopt a lazy initialization here is because the instantiation of RouterBuilder
is parameterized: It takes SparkConf
and isDriver
in ctor. So we'd guard our code against the twice invocation with different parameters, no matter it's practically possible or not.
#8060
Prepare
GlutenShuffleManager
as the new shuffle manager implementation of Gluten.The shuffle manager is simply a router for all shuffle managers registered through API
ShuffleManagerRegistry.get().register
.About backward compatibility:
ColumnarShuffleManager
/CelebornShuffleManager
/UniffleShuffleManager
will be deprecated in future (perhaps, in next few of PRs). Then once Spark configuration likespark.shuffle.manager=....ColumnarShuffleManager
is specified explicitly, Gluten could still run but raise a warning indicating that the option is deprecated.GlutenShuffleManager
will be recommended.GlutenShuffleManager
will maintain a mechanism (either automatically or by configuration) to route shuffle manager calls to the underlying registered shuffle managers. Hence, it will become recommended to usespark.shuffle.manager=....GlutenShuffleManager
instead, no matter RSS is required or not.This PR is only an API preparation with essential UTs. No existing production code is affected.