17
17
18
18
package org .apache .spark .executor
19
19
20
+ import java .io .File
21
+ import java .net .URL
22
+
20
23
import scala .util .Properties
21
24
22
- import org .apache .spark .{JobArtifactSet , JobArtifactState , LocalSparkContext , SparkConf , SparkContext , SparkFunSuite }
23
- import org .apache .spark .util .Utils
25
+ import org .apache .spark .{JobArtifactSet , JobArtifactState , LocalSparkContext , SparkConf , SparkContext , SparkFunSuite , TestUtils }
26
+ import org .apache .spark .util .{MutableURLClassLoader , Utils }
27
+
24
28
25
29
class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext {
26
30
@@ -126,7 +130,7 @@ class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext {
126
130
)
127
131
128
132
JobArtifactSet .withActiveJobArtifactState(artifactSetWithHelloV2.state.get) {
129
- sc.parallelize(1 to 1 ).foreach { i =>
133
+ sc.parallelize(1 to 1 ).foreach { _ =>
130
134
val cls = Utils .classForName(" com.example.Hello$" )
131
135
val module = cls.getField(" MODULE$" ).get(null )
132
136
val result = cls.getMethod(" test" ).invoke(module).asInstanceOf [Int ]
@@ -136,4 +140,106 @@ class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext {
136
140
}
137
141
}
138
142
}
143
+
144
+ test(" SPARK-51537 Executor isolation avoids reloading plugin jars" ) {
145
+ val tempDir = Utils .createTempDir()
146
+
147
+ val testCodeBody =
148
+ s """
149
+ | public static boolean flag = false;
150
+ | """ .stripMargin
151
+
152
+ val compiledTestCode = TestUtils .createCompiledClass(
153
+ " TestFoo" ,
154
+ tempDir,
155
+ " " ,
156
+ null ,
157
+ Seq .empty,
158
+ Seq .empty,
159
+ testCodeBody)
160
+
161
+ // Initialize the static variable flag in TestFoo when loading plugin at the first time.
162
+ // If the plugin is reloaded, the TestFoo.flag will be set to false by default.
163
+ val executorPluginCodeBody =
164
+ s """
165
+ |@Override
166
+ |public void init(
167
+ | org.apache.spark.api.plugin.PluginContext ctx,
168
+ | java.util.Map<String, String> extraConf) {
169
+ | TestFoo.flag = true;
170
+ |}
171
+ """ .stripMargin
172
+
173
+ val thisClassPath =
174
+ sys.props(" java.class.path" ).split(File .pathSeparator).map(p => new File (p).toURI.toURL)
175
+
176
+ val compiledExecutorPlugin = TestUtils .createCompiledClass(
177
+ " TestExecutorPlugin" ,
178
+ tempDir,
179
+ " " ,
180
+ null ,
181
+ Seq (tempDir.toURI.toURL) ++ thisClassPath,
182
+ Seq (" org.apache.spark.api.plugin.ExecutorPlugin" ),
183
+ executorPluginCodeBody)
184
+
185
+ val sparkPluginCodeBody =
186
+ """
187
+ |@Override
188
+ |public org.apache.spark.api.plugin.ExecutorPlugin executorPlugin() {
189
+ | return new TestExecutorPlugin();
190
+ |}
191
+ |
192
+ |@Override
193
+ |public org.apache.spark.api.plugin.DriverPlugin driverPlugin() { return null; }
194
+ """ .stripMargin
195
+
196
+ val compiledSparkPlugin = TestUtils .createCompiledClass(
197
+ " TestSparkPlugin" ,
198
+ tempDir,
199
+ " " ,
200
+ null ,
201
+ Seq (tempDir.toURI.toURL) ++ thisClassPath,
202
+ Seq (" org.apache.spark.api.plugin.SparkPlugin" ),
203
+ sparkPluginCodeBody)
204
+
205
+ val jarUrl = TestUtils .createJar(
206
+ Seq (compiledSparkPlugin, compiledExecutorPlugin, compiledTestCode),
207
+ new File (tempDir, " testplugin.jar" ))
208
+
209
+ def getClassLoader : MutableURLClassLoader = {
210
+ val loader = new MutableURLClassLoader (new Array [URL ](0 ),
211
+ Thread .currentThread.getContextClassLoader)
212
+ Thread .currentThread.setContextClassLoader(loader)
213
+ loader
214
+ }
215
+ // SparkContext does not add plugin jars specified by `spark.jars` configuration
216
+ // to the classpath, causing ClassNotFoundException when initializing plugins
217
+ // in SparkContext. We manually add the jars to the ClassLoader to resolve this.
218
+ val loader = getClassLoader
219
+ loader.addURL(jarUrl)
220
+
221
+ sc = new SparkContext (new SparkConf ()
222
+ .setAppName(" avoid-reloading-plugins" )
223
+ .setMaster(" local-cluster[1, 1, 1024]" )
224
+ .set(" spark.jars" , jarUrl.toString)
225
+ .set(" spark.plugins" , " TestSparkPlugin" ))
226
+
227
+ val jobArtifactSet = new JobArtifactSet (
228
+ Some (JobArtifactState (uuid = " avoid-reloading-plugins" , replClassDirUri = None )),
229
+ jars = Map .empty,
230
+ files = Map .empty,
231
+ archives = Map .empty
232
+ )
233
+
234
+ JobArtifactSet .withActiveJobArtifactState(jobArtifactSet.state.get) {
235
+ sc.parallelize(1 to 1 ).foreach { _ =>
236
+ val cls1 = Utils .classForName(" TestFoo" )
237
+ val z = cls1.getField(" flag" ).getBoolean(null )
238
+ // If the plugin has been reloaded, the TestFoo.flag will be false.
239
+ if (! z) {
240
+ throw new RuntimeException (" The spark plugin is reloaded" )
241
+ }
242
+ }
243
+ }
244
+ }
139
245
}
0 commit comments