PythonWorkerFactory¶
PythonWorkerFactory
is a factory of Python workers to execute PythonFunctions.
Note
There could be many PythonWorkerFactory
s on a single executor (one for every pair of the pythonExec and the envVars).
Creating Instance¶
PythonWorkerFactory
takes the following to be created:
- Python Executable
- Environment Variables
PythonWorkerFactory
is created when:
SparkEnv
is requested to createPythonWorker (forBasePythonRunner
to compute a partition).
Python Executable¶
PythonWorkerFactory
is given a Python executable (pythonExec
) when created.
The Python executable is the pythonExec of the first PythonFunction (of all the Python UDFs to execute by BasePythonRunner).
Note
It is assumed that all PythonFunctions (of a BasePythonRunner) should have the same Python executable, version and env vars. That is why it is safe to use the first PythonFunction
.
useDaemon¶
PythonWorkerFactory
initializes useDaemon
internal flag when created.
useDaemon
is enabled when the following all hold:
- spark.python.use.daemon is enabled
- The operating system is not MS Windows (based on
os.name
JVM property) as it works on UNIX-based systems only (because it uses signals for child management)
useDaemon
flag is used when PythonWorkerFactory
is requested for the following:
Daemon Process¶
daemon: Process = null
daemon
is a Process
(Java) to control Python worker processes.
daemon
is uninitialized (null
) right after PythonWorkerFactory
is created and right after stopDaemon.
daemon
is initialized and immediately started when startDaemon (and listens at daemonPort).
daemon
is alive until stopDaemon.
Any communication with the daemon
happens through daemonPort.
Port¶
daemonPort: Int = 0
daemonPort
is the communication channel (port) of the daemon Python process (that is known only after startDaemon).
daemonPort
(alongside the daemonHost) is used to open a socket stream and launch workers.
Python Workers¶
daemonWorkers: mutable.WeakHashMap[Socket, Int]
PythonWorkerFactory
creates daemonWorkers
internal registry of socket streams and the worker's PID when created.
A new pair is added in createSocket (when createThroughDaemon).
daemonWorkers
is used when:
- create (with useDaemon flag enabled and non-empty idleWorkers)
- stopWorker
Python Modules¶
Daemon¶
PythonWorkerFactory
initializes daemonModule
internal property for the Python Daemon Module when created.
daemonModule
is the value of spark.python.daemon.module configuration property.
The Python Daemon Module is used when PythonWorkerFactory
is requested to create and start a daemon module.
Worker¶
PythonWorkerFactory
uses spark.python.worker.module configuration property to specify the Python Worker Module.
The Python Worker Module is used when PythonWorkerFactory
is requested to create and start a worker.
Creating Python Worker¶
create(): (Socket, Option[Int])
create
branches off based on the useDaemon flag:
- When enabled,
create
firstly checks the idleWorkers queue and returns one if available. Otherwise,create
createThroughDaemon - When disabled,
create
createSimpleWorker
create
is used when:
SparkEnv
is requested to createPythonWorker
Creating Daemon Worker¶
createThroughDaemon(): (Socket, Option[Int])
createThroughDaemon
startDaemon followed by createSocket.
In case of a SocketException
, createThroughDaemon
prints out the following WARN message to the logs:
Failed to open socket to Python daemon: [exception]
Assuming that daemon unexpectedly quit, attempting to restart
And then, createThroughDaemon
stopDaemon, startDaemon and createSocket.
createSocket¶
createSocket(): (Socket, Option[Int])
createSocket
creates a new stream socket and connects it to the daemonPort at the daemonHost.
createSocket
reads the PID (of the python worker behind the stream socket) and requests the authHelper to authToServer
.
In the end, createSocket
returns the socket and the PID (after registering them in the daemonWorkers registry).
Starting Python Daemon Process¶
startDaemon(): Unit
Does nothing with daemon
initialized
startDaemon
does nothing when daemon is initialized (non-null
) that indicates that the daemon is already up and running.
startDaemon
creates the command (using the given pythonExec and the daemon module):
[pythonExec] -m [daemonModule]
startDaemon
adds the given envVars and the following (extra) environment variables to the environment of future python processes:
Environment Variable | Value |
---|---|
PYTHONPATH | pythonPath |
PYTHON_WORKER_FACTORY_SECRET | authHelper |
SPARK_PREFER_IPV6 | True if the underlying JVM prefer IPv6 addresses (based on java.net.preferIPv6Addresses JVM property) |
PYTHONUNBUFFERED | YES |
startDaemon
starts a new process (that is known as the daemon).
startDaemon
connects to the python process to read the daemonPort.
In the end, startDaemon
redirectStreamsToStderr.
Creating Simple Non-Daemon Worker¶
createSimpleWorker(): Socket
createSimpleWorker
...FIXME
createSimpleWorker
is used when PythonWorkerFactory
is requested to create a Python worker (with useDaemon flag disabled).
Logging¶
Enable ALL
logging level for org.apache.spark.api.python.PythonWorkerFactory
logger to see what happens inside.
Add the following line to conf/log4j2.properties
:
logger.PythonWorkerFactory.name = org.apache.spark.api.python.PythonWorkerFactory
logger.PythonWorkerFactory.level = all
Refer to Logging.