Developing RPC Environment

FIXME

  • Create the exercise

  • It could be easier to have an exercise to register a custom RpcEndpoint (it can receive network events known to all endpoints, e.g. RemoteProcessConnected = "a new node connected" or RemoteProcessDisconnected = a node disconnected). That could be the only way to know about the current runtime configuration of RpcEnv. Use SparkEnv.rpcEnv and rpcEnv.setupEndpoint(name, endpointCreator) to register a RPC Endpoint.

Start simple using the following command:

$ ./bin/spark-shell --conf spark.rpc=doesnotexist
...
15/10/21 12:06:11 INFO SparkContext: Running Spark version 1.6.0-SNAPSHOT
...
15/10/21 12:06:11 ERROR SparkContext: Error initializing SparkContext.
java.lang.ClassNotFoundException: doesnotexist
	at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
	at org.apache.spark.rpc.RpcEnv$.getRpcEnvFactory(RpcEnv.scala:38)
	at org.apache.spark.rpc.RpcEnv$.create(RpcEnv.scala:49)
	at org.apache.spark.SparkEnv$.create(SparkEnv.scala:257)
	at org.apache.spark.SparkEnv$.createDriverEnv(SparkEnv.scala:198)
	at org.apache.spark.SparkContext.createSparkEnv(SparkContext.scala:272)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:441)
	at org.apache.spark.repl.Main$.createSparkContext(Main.scala:79)
	at $line3.$read$$iw$$iw.<init>(<console>:12)
	at $line3.$read$$iw.<init>(<console>:21)
	at $line3.$read.<init>(<console>:23)
	at $line3.$read$.<init>(<console>:27)
	at $line3.$read$.<clinit>(<console>)
	at $line3.$eval$.$print$lzycompute(<console>:7)
	at $line3.$eval$.$print(<console>:6)
	at $line3.$eval.$print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:784)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1039)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:636)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:635)
	at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)
	at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:635)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:567)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:563)
	at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:802)
	at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:836)
	at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:694)
	at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:404)
	at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcZ$sp(SparkILoop.scala:39)
	at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:38)
	at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:38)
	at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:213)
	at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:38)
	at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:94)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:922)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:911)
	at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:911)
	at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)
	at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:911)
	at org.apache.spark.repl.Main$.main(Main.scala:49)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:680)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)