Uniffle Shuffle Client Guide
Uniffle is designed as a unified shuffle engine for multiple computing frameworks, including Apache Spark and Apache Hadoop. Uniffle has provided pluggable client plugins to enable remote shuffle in Spark and MapReduce.
Deploy
This document will introduce how to deploy Uniffle client plugins with Spark and MapReduce.
Deploy Spark Client Plugin
Add client jar to Spark classpath, eg, SPARK_HOME/jars/
The jar for Spark2 is located in <RSS_HOME>/jars/client/spark2/rss-client-XXXXX-shaded.jar
The jar for Spark3 is located in <RSS_HOME>/jars/client/spark3/rss-client-XXXXX-shaded.jar
Update Spark conf to enable Uniffle, eg,
spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager
spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999
# Note: For Spark2, spark.sql.adaptive.enabled should be false because Spark2 doesn't support AQE.
Support Spark Dynamic Allocation
To support spark dynamic allocation with Uniffle, spark code should be updated. There are 7 patches for spark (2.3.4/2.4.6/3.0.1/3.1.2/3.2.1/3.3.1/3.4.1) in patch/spark folder for reference.
After apply the patch and rebuild spark, add following configuration in spark conf to enable dynamic allocation:
spark.shuffle.service.enabled false
spark.dynamicAllocation.enabled true
For spark3.5 or above just add one more configuration:
spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.RssShuffleDataIo
Deploy MapReduce Client Plugin
- Add client jar to the classpath of each NodeManager, e.g., <HADOOP_HOME>/share/hadoop/mapreduce/
The jar for MapReduce is located in <RSS_HOME>/jars/client/mr/rss-client-mr-XXXXX-shaded.jar
Update MapReduce conf to enable Uniffle, eg,
-Dmapreduce.rss.coordinator.quorum=<coordinatorIp1>:19999,<coordinatorIp2>:19999
-Dyarn.app.mapreduce.am.command-opts=org.apache.hadoop.mapreduce.v2.app.RssMRAppMaster
-Dmapreduce.job.map.output.collector.class=org.apache.hadoop.mapred.RssMapOutputCollector
-Dmapreduce.job.reduce.shuffle.consumer.plugin.class=org.apache.hadoop.mapreduce.task.reduce.RssShuffleNote that the RssMRAppMaster will automatically disable slow start (i.e.,
mapreduce.job.reduce.slowstart.completedmaps=1
) and job recovery (i.e.,yarn.app.mapreduce.am.job.recovery.enable=false
)
Configuration
The important configuration of client is listed as following.
Common Setting
These configurations are shared by all types of clients.
Property Name | Default | Description |
---|---|---|
<client_type>.rss.coordinator.quorum | - | Coordinator quorum |
<client_type>.rss.writer.buffer.size | 3m | Buffer size for single partition data |
<client_type>.rss.storage.type | - | Supports MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS |
<client_type>.rss.client.read.buffer.size | 14m | The max data size read from storage |
<client_type>.rss.client.send.threadPool.size | 5 | The thread size for send shuffle data to shuffle server |
<client_type>.rss.client.assignment.tags | - | The comma-separated list of tags for deciding assignment shuffle servers. Notice that the SHUFFLE_SERVER_VERSION will always as the assignment tag whether this conf is set or not |
<client_type>.rss.client.data.commit.pool.size | The number of assigned shuffle servers | The thread size for sending commit to shuffle servers |
<client_type>.rss.client.assignment.shuffle.nodes.max | -1 | The number of required assignment shuffle servers. If it is less than 0 or equals to 0 or greater than the coordinator's config of "rss.coordinator.shuffle.nodes.max", it will use the size of "rss.coordinator.shuffle.nodes.max" default |
Notice:
<client_type>
should bespark
ormapreduce
<client_type>.rss.coordinator.quorum
is compulsory, and other configurations are optional when coordinator dynamic configuration is enabled.
Adaptive Remote Shuffle Enabling
To select build-in shuffle or remote shuffle in a smart manner, Uniffle support adaptive enabling.
The client should use DelegationRssShuffleManager
and provide its unique <access_id> so that the coordinator could distinguish whether it should enable remote shuffle.
spark.shuffle.manager org.apache.spark.shuffle.DelegationRssShuffleManager
spark.rss.access.id=<access_id>
Notice: Currently, this feature only supports Spark.
Other configuration:
Property Name | Default | Description |
---|---|---|
spark.rss.access.timeout.ms | 10000 | The timeout to access Uniffle coordinator |
spark.rss.client.access.retry.interval.ms | 20000 | The interval between retries fallback to SortShuffleManager |
spark.rss.client.access.retry.times | 0 | The number of retries fallback to SortShuffleManager |
Client Quorum Setting
Uniffle supports client-side quorum protocol to tolerant shuffle server crash. This feature is client-side behaviour, in which shuffle writer sends each block to multiple servers, and shuffle readers could fetch block data from one of server. Since sending multiple replicas of blocks can reduce the shuffle performance and resource consumption, we designed it as an optional feature.
Property Name | Default | Description |
---|---|---|
<client_type>.rss.data.replica | 1 | The max server number that each block can be send by client in quorum protocol |
<client_type>.rss.data.replica.write | 1 | The min server number that each block should be send by client successfully |
<client_type>.rss.data.replica.read | 1 | The min server number that metadata should be fetched by client successfully |
Notice:
spark.rss.data.replica.write
+spark.rss.data.replica.read
>spark.rss.data.replica
Recommended examples:
- Performance First (default)
spark.rss.data.replica 1
spark.rss.data.replica.write 1
spark.rss.data.replica.read 1
- Fault-tolerant First
spark.rss.data.replica 3
spark.rss.data.replica.write 2
spark.rss.data.replica.read 2
Spark Specialized Setting
The important configuration is listed as following.
Property Name | Default | Description |
---|---|---|
spark.rss.writer.buffer.spill.size | 128m | Buffer size for total partition data |
spark.rss.client.send.size.limit | 16m | The max data size sent to shuffle server |
spark.rss.client.unregister.thread.pool.size | 10 | The max size of thread pool of unregistering |
spark.rss.client.unregister.request.timeout.sec | 10 | The max timeout sec when doing unregister to remote shuffle-servers |
MapReduce Specialized Setting
Property Name | Default | Description |
---|---|---|
mapreduce.rss.client.max.buffer.size | 3k | The max buffer size in map side |
mapreduce.rss.client.batch.trigger.num | 50 | The max batch of buffers to send data in map side |
Remote Spill (Experimental)
In cloud environment, VM may have very limited disk space and performance. This experimental feature allows reduce tasks to spill data to remote storage (e.g., hdfs)
Property Name | Default | Description |
---|---|---|
mapreduce.rss.reduce.remote.spill.enable | false | Whether to use remote spill |
mapreduce.rss.reduce.remote.spill.attempt.inc | 1 | Increase reduce attempts as hdfs is easier to crash than disk |
mapreduce.rss.reduce.remote.spill.replication | 1 | The replication number to spill data to hdfs |
mapreduce.rss.reduce.remote.spill.retries | 5 | The retry number to spill data to hdfs |
Notice: this feature requires the MEMORY_LOCAL_HDFS mode.