Skip to content

Commit 9ed1f38

Browse files
author
Florian Kelbert
committed
Merge branch 'master' into dev-fk
2 parents 9b8ece0 + fcc6fd1 commit 9ed1f38

File tree

115 files changed

+2133
-659
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

115 files changed

+2133
-659
lines changed

R/pkg/vignettes/sparkr-vignettes.Rmd

+10-10
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ Sys.setenv("_JAVA_OPTIONS" = paste("-XX:-UsePerfData", old_java_opt, sep = " "))
4646

4747
## Overview
4848

49-
SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. With Spark `r packageVersion("SparkR")`, SparkR provides a distributed data frame implementation that supports data processing operations like selection, filtering, aggregation etc. and distributed machine learning using [MLlib](http://spark.apache.org/mllib/).
49+
SparkR is an R package that provides a light-weight frontend to use Apache Spark from R. With Spark `r packageVersion("SparkR")`, SparkR provides a distributed data frame implementation that supports data processing operations like selection, filtering, aggregation etc. and distributed machine learning using [MLlib](https://spark.apache.org/mllib/).
5050

5151
## Getting Started
5252

@@ -132,7 +132,7 @@ sparkR.session.stop()
132132

133133
Different from many other R packages, to use SparkR, you need an additional installation of Apache Spark. The Spark installation will be used to run a backend process that will compile and execute SparkR programs.
134134

135-
After installing the SparkR package, you can call `sparkR.session` as explained in the previous section to start and it will check for the Spark installation. If you are working with SparkR from an interactive shell (eg. R, RStudio) then Spark is downloaded and cached automatically if it is not found. Alternatively, we provide an easy-to-use function `install.spark` for running this manually. If you don't have Spark installed on the computer, you may download it from [Apache Spark Website](http://spark.apache.org/downloads.html).
135+
After installing the SparkR package, you can call `sparkR.session` as explained in the previous section to start and it will check for the Spark installation. If you are working with SparkR from an interactive shell (eg. R, RStudio) then Spark is downloaded and cached automatically if it is not found. Alternatively, we provide an easy-to-use function `install.spark` for running this manually. If you don't have Spark installed on the computer, you may download it from [Apache Spark Website](https://spark.apache.org/downloads.html).
136136

137137
```{r, eval=FALSE}
138138
install.spark()
@@ -147,7 +147,7 @@ sparkR.session(sparkHome = "/HOME/spark")
147147
### Spark Session {#SetupSparkSession}
148148

149149

150-
In addition to `sparkHome`, many other options can be specified in `sparkR.session`. For a complete list, see [Starting up: SparkSession](http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparksession) and [SparkR API doc](http://spark.apache.org/docs/latest/api/R/sparkR.session.html).
150+
In addition to `sparkHome`, many other options can be specified in `sparkR.session`. For a complete list, see [Starting up: SparkSession](https://spark.apache.org/docs/latest/sparkr.html#starting-up-sparksession) and [SparkR API doc](https://spark.apache.org/docs/latest/api/R/sparkR.session.html).
151151

152152
In particular, the following Spark driver properties can be set in `sparkConfig`.
153153

@@ -169,15 +169,15 @@ sparkR.session(spark.sql.warehouse.dir = spark_warehouse_path)
169169

170170

171171
#### Cluster Mode
172-
SparkR can connect to remote Spark clusters. [Cluster Mode Overview](http://spark.apache.org/docs/latest/cluster-overview.html) is a good introduction to different Spark cluster modes.
172+
SparkR can connect to remote Spark clusters. [Cluster Mode Overview](https://spark.apache.org/docs/latest/cluster-overview.html) is a good introduction to different Spark cluster modes.
173173

174174
When connecting SparkR to a remote Spark cluster, make sure that the Spark version and Hadoop version on the machine match the corresponding versions on the cluster. Current SparkR package is compatible with
175175
```{r, echo=FALSE, tidy = TRUE}
176176
paste("Spark", packageVersion("SparkR"))
177177
```
178178
It should be used both on the local computer and on the remote cluster.
179179

180-
To connect, pass the URL of the master node to `sparkR.session`. A complete list can be seen in [Spark Master URLs](http://spark.apache.org/docs/latest/submitting-applications.html#master-urls).
180+
To connect, pass the URL of the master node to `sparkR.session`. A complete list can be seen in [Spark Master URLs](https://spark.apache.org/docs/latest/submitting-applications.html#master-urls).
181181
For example, to connect to a local standalone Spark master, we can call
182182

183183
```{r, eval=FALSE}
@@ -317,7 +317,7 @@ A common flow of grouping and aggregation is
317317

318318
2. Feed the `GroupedData` object to `agg` or `summarize` functions, with some provided aggregation functions to compute a number within each group.
319319

320-
A number of widely used functions are supported to aggregate data after grouping, including `avg`, `countDistinct`, `count`, `first`, `kurtosis`, `last`, `max`, `mean`, `min`, `sd`, `skewness`, `stddev_pop`, `stddev_samp`, `sumDistinct`, `sum`, `var_pop`, `var_samp`, `var`. See the [API doc for `mean`](http://spark.apache.org/docs/latest/api/R/mean.html) and other `agg_funcs` linked there.
320+
A number of widely used functions are supported to aggregate data after grouping, including `avg`, `countDistinct`, `count`, `first`, `kurtosis`, `last`, `max`, `mean`, `min`, `sd`, `skewness`, `stddev_pop`, `stddev_samp`, `sumDistinct`, `sum`, `var_pop`, `var_samp`, `var`. See the [API doc for aggregate functions](https://spark.apache.org/docs/latest/api/R/column_aggregate_functions.html) linked there.
321321

322322
For example we can compute a histogram of the number of cylinders in the `mtcars` dataset as shown below.
323323

@@ -935,7 +935,7 @@ perplexity
935935

936936
#### Alternating Least Squares
937937

938-
`spark.als` learns latent factors in [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) via [alternating least squares](http://dl.acm.org/citation.cfm?id=1608614).
938+
`spark.als` learns latent factors in [collaborative filtering](https://en.wikipedia.org/wiki/Recommender_system#Collaborative_filtering) via [alternating least squares](https://dl.acm.org/citation.cfm?id=1608614).
939939

940940
There are multiple options that can be configured in `spark.als`, including `rank`, `reg`, and `nonnegative`. For a complete list, refer to the help file.
941941

@@ -1171,11 +1171,11 @@ env | map
11711171

11721172
## References
11731173

1174-
* [Spark Cluster Mode Overview](http://spark.apache.org/docs/latest/cluster-overview.html)
1174+
* [Spark Cluster Mode Overview](https://spark.apache.org/docs/latest/cluster-overview.html)
11751175

1176-
* [Submitting Spark Applications](http://spark.apache.org/docs/latest/submitting-applications.html)
1176+
* [Submitting Spark Applications](https://spark.apache.org/docs/latest/submitting-applications.html)
11771177

1178-
* [Machine Learning Library Guide (MLlib)](http://spark.apache.org/docs/latest/ml-guide.html)
1178+
* [Machine Learning Library Guide (MLlib)](https://spark.apache.org/docs/latest/ml-guide.html)
11791179

11801180
* [SparkR: Scaling R Programs with Spark](https://people.csail.mit.edu/matei/papers/2016/sigmod_sparkr.pdf), Shivaram Venkataraman, Zongheng Yang, Davies Liu, Eric Liang, Hossein Falaki, Xiangrui Meng, Reynold Xin, Ali Ghodsi, Michael Franklin, Ion Stoica, and Matei Zaharia. SIGMOD 2016. June 2016.
11811181

SgxREADME.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ ensure that sgx-lkl executes simple Java applications successfully.
5858
5959
3. Run the enclave for the Worker node
6060
61-
`sgx-spark# LD_LIBRARY_PATH=/opt/j2re-image/lib/amd64:/opt/j2re-image/lib/amd64/jli:/opt/j2re-image/lib/amd64/server:/lib:/usr/lib:/usr/local/lib SGXLKL_STRACELKL=1 SGXLKL_VERBOSE=1 SGXLKL_TRACE_SYSCALL=0 SGXLKL_TRACE_MMAP=0 SGXLKL_TAP=tap2 SGXLKL_HD=lkl/alpine-rootfs.img SGXLKL_KERNEL=0 SGXLKL_VERSION=1 SGXLKL_ESLEEP=16 SGXLKL_SSLEEP=16 SGXLKL_ESPINS=16 SGXLKL_SSPINS=16 SGXLKL_HOSTNAME=localhost SGXLKL_STHREADS=6 SGXLKL_ETHREADS=3 IS_ENCLAVE=true SGX_USE_SHMEM=true SGXLKL_SHMEM_FILE=/sgx-lkl-shmem SGX_ENABLED=true SGXLKL_SHMEM_SIZE=10240000 CONNECTIONS=1 PREFETCH=8 ../sgx-lkl/sgx-musl-lkl/obj/sgx-lkl-starter /opt/j2re-image/bin/java -XX:InitialCodeCacheSize=8m -XX:ReservedCodeCacheSize=8m -Xms16m -Xmx16m -XX:CompressedClassSpaceSize=8m -XX:MaxMetaspaceSize=32m -XX:+UseCompressedClassPointers -XX:+AssumeMP -Xint -Djava.library.path=/spark/lib/ -cp /home/scala-library/:/spark/conf/:/spark/assembly/target/scala-2.11/jars/\*:/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.3.1-SNAPSHOT.jar org.apache.spark.sgx.SgxMain`
61+
`sgx-spark# LD_LIBRARY_PATH=/opt/j2re-image/lib/amd64:/opt/j2re-image/lib/amd64/jli:/opt/j2re-image/lib/amd64/server:/lib:/usr/lib:/usr/local/lib SGXLKL_STRACELKL=1 SGXLKL_VERBOSE=1 SGXLKL_TRACE_SYSCALL=0 SGXLKL_TRACE_MMAP=0 SGXLKL_TAP=tap2 SGXLKL_HD=lkl/alpine-rootfs.img SGXLKL_KERNEL=0 SGXLKL_VERSION=1 SGXLKL_ESLEEP=16 SGXLKL_SSLEEP=16 SGXLKL_ESPINS=16 SGXLKL_SSPINS=16 SGXLKL_HOSTNAME=localhost SGXLKL_STHREADS=6 SGXLKL_ETHREADS=3 IS_ENCLAVE=true SGX_USE_SHMEM=true SGXLKL_SHMEM_FILE=/sgx-lkl-shmem SGX_ENABLED=true SGXLKL_SHMEM_SIZE=10240000 CONNECTIONS=1 PREFETCH=8 ../sgx-lkl/sgx-musl-lkl/obj/sgx-lkl-starter /opt/j2re-image/bin/java -XX:InitialCodeCacheSize=8m -XX:ReservedCodeCacheSize=8m -Xms16m -Xmx16m -XX:+UseMembar -XX:CompressedClassSpaceSize=8m -XX:MaxMetaspaceSize=32m -XX:+UseCompressedClassPointers -XX:+AssumeMP -Xint -Djava.library.path=/spark/lib/ -cp /home/scala-library/:/spark/conf/:/spark/assembly/target/scala-2.11/jars/\*:/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.3.1-SNAPSHOT.jar org.apache.spark.sgx.SgxMain`
6262
6363
4. Run the enclave for the driver program:
6464
65-
`sgx-spark# LD_LIBRARY_PATH=/opt/j2re-image/lib/amd64:/opt/j2re-image/lib/amd64/jli:/opt/j2re-image/lib/amd64/server:/lib:/usr/lib:/usr/local/lib SGXLKL_STRACELKL=1 SGXLKL_VERBOSE=1 SGXLKL_TRACE_SYSCALL=0 SGXLKL_TRACE_MMAP=0 SGXLKL_TAP=tap1 SGXLKL_HD=lkl/alpine-rootfs.img SGXLKL_KERNEL=0 SGXLKL_VERSION=1 SGXLKL_ESLEEP=16 SGXLKL_SSLEEP=16 SGXLKL_ESPINS=16 SGXLKL_SSPINS=16 SGXLKL_HOSTNAME=localhost SGXLKL_STHREADS=6 SGXLKL_ETHREADS=3 IS_ENCLAVE=true SGX_USE_SHMEM=true SGXLKL_SHMEM_FILE=/sgx-lkl-shmem-driver SGX_ENABLED=true SGXLKL_SHMEM_SIZE=10240000 CONNECTIONS=1 PREFETCH=8 ../sgx-lkl/sgx-musl-lkl/obj/sgx-lkl-starter /opt/j2re-image/bin/java -XX:InitialCodeCacheSize=8m -XX:ReservedCodeCacheSize=8m -Xms16m -Xmx16m -XX:CompressedClassSpaceSize=8m -XX:MaxMetaspaceSize=32m -XX:+UseCompressedClassPointers -XX:+AssumeMP -Xint -Djava.library.path=/spark/lib/ -cp /home/scala-library/:/spark/conf/:/spark/assembly/target/scala-2.11/jars/\*:/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.3.1-SNAPSHOT.jar org.apache.spark.sgx.SgxMain`
65+
`sgx-spark# LD_LIBRARY_PATH=/opt/j2re-image/lib/amd64:/opt/j2re-image/lib/amd64/jli:/opt/j2re-image/lib/amd64/server:/lib:/usr/lib:/usr/local/lib SGXLKL_STRACELKL=1 SGXLKL_VERBOSE=1 SGXLKL_TRACE_SYSCALL=0 SGXLKL_TRACE_MMAP=0 SGXLKL_TAP=tap1 SGXLKL_HD=lkl/alpine-rootfs.img SGXLKL_KERNEL=0 SGXLKL_VERSION=1 SGXLKL_ESLEEP=16 SGXLKL_SSLEEP=16 SGXLKL_ESPINS=16 SGXLKL_SSPINS=16 SGXLKL_HOSTNAME=localhost SGXLKL_STHREADS=6 SGXLKL_ETHREADS=3 IS_ENCLAVE=true SGX_USE_SHMEM=true SGXLKL_SHMEM_FILE=/sgx-lkl-shmem-driver SGX_ENABLED=true SGXLKL_SHMEM_SIZE=10240000 CONNECTIONS=1 PREFETCH=8 ../sgx-lkl/sgx-musl-lkl/obj/sgx-lkl-starter /opt/j2re-image/bin/java -XX:InitialCodeCacheSize=8m -XX:ReservedCodeCacheSize=8m -Xms16m -Xmx16m -XX:+UseMembar -XX:CompressedClassSpaceSize=8m -XX:MaxMetaspaceSize=32m -XX:+UseCompressedClassPointers -XX:+AssumeMP -Xint -Djava.library.path=/spark/lib/ -cp /home/scala-library/:/spark/conf/:/spark/assembly/target/scala-2.11/jars/\*:/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.3.1-SNAPSHOT.jar org.apache.spark.sgx.SgxMain`
6666
6767
5. Finally, submit a Spark job:
6868

common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_x86_32.java

+16
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, i
6060
}
6161

6262
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
63+
// This is not compatible with original and another implementations.
64+
// But remain it for backward compatibility for the components existing before 2.3.
6365
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
6466
int lengthAligned = lengthInBytes - lengthInBytes % 4;
6567
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
@@ -71,6 +73,20 @@ public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, i
7173
return fmix(h1, lengthInBytes);
7274
}
7375

76+
public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
77+
// This is compatible with original and another implementations.
78+
// Use this method for new components after Spark 2.3.
79+
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
80+
int lengthAligned = lengthInBytes - lengthInBytes % 4;
81+
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
82+
int k1 = 0;
83+
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
84+
k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
85+
}
86+
h1 ^= mixK1(k1);
87+
return fmix(h1, lengthInBytes);
88+
}
89+
7490
private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
7591
assert (lengthInBytes % 4 == 0);
7692
int h1 = seed;

common/unsafe/src/main/java/org/apache/spark/unsafe/hash/Murmur3_x86_32.java

+16
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public static int hashUnsafeWords(Object base, long offset, int lengthInBytes, i
6060
}
6161

6262
public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, int seed) {
63+
// This is not compatible with original and another implementations.
64+
// But remain it for backward compatibility for the components existing before 2.3.
6365
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
6466
int lengthAligned = lengthInBytes - lengthInBytes % 4;
6567
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
@@ -71,6 +73,20 @@ public static int hashUnsafeBytes(Object base, long offset, int lengthInBytes, i
7173
return fmix(h1, lengthInBytes);
7274
}
7375

76+
public static int hashUnsafeBytes2(Object base, long offset, int lengthInBytes, int seed) {
77+
// This is compatible with original and another implementations.
78+
// Use this method for new components after Spark 2.3.
79+
assert (lengthInBytes >= 0): "lengthInBytes cannot be negative";
80+
int lengthAligned = lengthInBytes - lengthInBytes % 4;
81+
int h1 = hashBytesByInt(base, offset, lengthAligned, seed);
82+
int k1 = 0;
83+
for (int i = lengthAligned, shift = 0; i < lengthInBytes; i++, shift += 8) {
84+
k1 ^= (Platform.getByte(base, offset + i) & 0xFF) << shift;
85+
}
86+
h1 ^= mixK1(k1);
87+
return fmix(h1, lengthInBytes);
88+
}
89+
7490
private static int hashBytesByInt(Object base, long offset, int lengthInBytes, int seed) {
7591
assert (lengthInBytes % 4 == 0);
7692
int h1 = seed;

common/unsafe/src/test/java/org/apache/spark/unsafe/hash/Murmur3_x86_32Suite.java

+19
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.Random;
2323
import java.util.Set;
2424

25+
import scala.util.hashing.MurmurHash3$;
26+
2527
import org.apache.spark.unsafe.Platform;
2628
import org.junit.Assert;
2729
import org.junit.Test;
@@ -51,6 +53,23 @@ public void testKnownLongInputs() {
5153
Assert.assertEquals(-2106506049, hasher.hashLong(Long.MAX_VALUE));
5254
}
5355

56+
// SPARK-23381 Check whether the hash of the byte array is the same as another implementations
57+
@Test
58+
public void testKnownBytesInputs() {
59+
byte[] test = "test".getBytes(StandardCharsets.UTF_8);
60+
Assert.assertEquals(MurmurHash3$.MODULE$.bytesHash(test, 0),
61+
Murmur3_x86_32.hashUnsafeBytes2(test, Platform.BYTE_ARRAY_OFFSET, test.length, 0));
62+
byte[] test1 = "test1".getBytes(StandardCharsets.UTF_8);
63+
Assert.assertEquals(MurmurHash3$.MODULE$.bytesHash(test1, 0),
64+
Murmur3_x86_32.hashUnsafeBytes2(test1, Platform.BYTE_ARRAY_OFFSET, test1.length, 0));
65+
byte[] te = "te".getBytes(StandardCharsets.UTF_8);
66+
Assert.assertEquals(MurmurHash3$.MODULE$.bytesHash(te, 0),
67+
Murmur3_x86_32.hashUnsafeBytes2(te, Platform.BYTE_ARRAY_OFFSET, te.length, 0));
68+
byte[] tes = "tes".getBytes(StandardCharsets.UTF_8);
69+
Assert.assertEquals(MurmurHash3$.MODULE$.bytesHash(tes, 0),
70+
Murmur3_x86_32.hashUnsafeBytes2(tes, Platform.BYTE_ARRAY_OFFSET, tes.length, 0));
71+
}
72+
5473
@Test
5574
public void randomizedStressTest() {
5675
int size = 65536;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
2+
<launchConfiguration type="org.eclipse.ant.AntBuilderLaunchConfigurationType">
3+
<booleanAttribute key="org.eclipse.ui.externaltools.ATTR_BUILDER_ENABLED" value="false"/>
4+
<stringAttribute key="org.eclipse.ui.externaltools.ATTR_DISABLED_BUILDER" value="org.eclipse.jdt.core.javabuilder"/>
5+
<mapAttribute key="org.eclipse.ui.externaltools.ATTR_TOOL_ARGUMENTS"/>
6+
<booleanAttribute key="org.eclipse.ui.externaltools.ATTR_TRIGGERS_CONFIGURED" value="true"/>
7+
</launchConfiguration>

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848
import org.apache.spark.storage.*;
4949
import org.apache.spark.util.Utils;
5050

51+
import org.apache.spark.sgx.iterator.SgxFakeIterator;
52+
import org.apache.spark.sgx.SgxSettings;
53+
5154
/**
5255
* This class implements sort-based shuffle's hash-style shuffle fallback path. This write path
5356
* writes incoming records to separate files, one file per reduce partition, then concatenates these
@@ -120,8 +123,14 @@ final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
120123
}
121124

122125
@Override
123-
public void write(Iterator<Product2<K, V>> records) throws IOException {
126+
public void write(Iterator<Product2<K, V>> records2) throws IOException {
124127
assert (partitionWriters == null);
128+
129+
Iterator<Product2<K, V>> records = records2;
130+
if (SgxSettings.SGX_ENABLED() && (records2 instanceof SgxFakeIterator)) {
131+
records = ((SgxFakeIterator<Product2<K, V>>) records2).access();
132+
}
133+
125134
if (!records.hasNext()) {
126135
partitionLengths = new long[numPartitions];
127136
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);

core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala

+8-7
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
5555
/**
5656
* Request that the cluster manager kill the specified executors.
5757
*
58-
* When asking the executor to be replaced, the executor loss is considered a failure, and
59-
* killed tasks that are running on the executor will count towards the failure limits. If no
60-
* replacement is being requested, then the tasks will not count towards the limit.
61-
*
6258
* @param executorIds identifiers of executors to kill
63-
* @param replace whether to replace the killed executors with new ones, default false
59+
* @param adjustTargetNumExecutors whether the target number of executors will be adjusted down
60+
* after these executors have been killed
61+
* @param countFailures if there are tasks running on the executors when they are killed, whether
62+
* to count those failures toward task failure limits
6463
* @param force whether to force kill busy executors, default false
6564
* @return the ids of the executors acknowledged by the cluster manager to be removed.
6665
*/
6766
def killExecutors(
6867
executorIds: Seq[String],
69-
replace: Boolean = false,
68+
adjustTargetNumExecutors: Boolean,
69+
countFailures: Boolean,
7070
force: Boolean = false): Seq[String]
7171

7272
/**
@@ -81,7 +81,8 @@ private[spark] trait ExecutorAllocationClient {
8181
* @return whether the request is acknowledged by the cluster manager.
8282
*/
8383
def killExecutor(executorId: String): Boolean = {
84-
val killedExecutors = killExecutors(Seq(executorId))
84+
val killedExecutors = killExecutors(Seq(executorId), adjustTargetNumExecutors = true,
85+
countFailures = false)
8586
killedExecutors.nonEmpty && killedExecutors(0).equals(executorId)
8687
}
8788
}

0 commit comments

Comments
 (0)