PythonWorkerFactory¶
PythonWorkerFactory is a factory of Python workers to execute PythonFunctions.

Note
There could be many PythonWorkerFactorys on a single executor (one for every pair of the pythonExec and the envVars).
Creating Instance¶
PythonWorkerFactory takes the following to be created:
- Python Executable
- Environment Variables
PythonWorkerFactory is created when:
SparkEnvis requested to createPythonWorker (forBasePythonRunnerto compute a partition).
Python Executable¶
PythonWorkerFactory is given a Python executable (pythonExec) when created.
The Python executable is the pythonExec of the first PythonFunction (of all the Python UDFs to execute by BasePythonRunner).
Note
It is assumed that all PythonFunctions (of a BasePythonRunner) should have the same Python executable, version and env vars. That is why it is safe to use the first PythonFunction.
useDaemon¶
PythonWorkerFactory initializes useDaemon internal flag when created.
useDaemon is enabled when the following all hold:
- spark.python.use.daemon is enabled
- The operating system is not MS Windows (based on
os.nameJVM property) as it works on UNIX-based systems only (because it uses signals for child management)
useDaemon flag is used when PythonWorkerFactory is requested for the following:
Daemon Process¶
daemon: Process = null
daemon is a Process (Java) to control Python worker processes.
daemon is uninitialized (null) right after PythonWorkerFactory is created and right after stopDaemon.
daemon is initialized and immediately started when startDaemon (and listens at daemonPort).
daemon is alive until stopDaemon.
Any communication with the daemon happens through daemonPort.
Port¶
daemonPort: Int = 0
daemonPort is the communication channel (port) of the daemon Python process (that is known only after startDaemon).
daemonPort (alongside the daemonHost) is used to open a socket stream and launch workers.
Python Workers¶
daemonWorkers: mutable.WeakHashMap[Socket, Int]
PythonWorkerFactory creates daemonWorkers internal registry of socket streams and the worker's PID when created.
A new pair is added in createSocket (when createThroughDaemon).
daemonWorkers is used when:
- create (with useDaemon flag enabled and non-empty idleWorkers)
- stopWorker
Python Modules¶
Daemon¶
PythonWorkerFactory initializes daemonModule internal property for the Python Daemon Module when created.
daemonModule is the value of spark.python.daemon.module configuration property.
The Python Daemon Module is used when PythonWorkerFactory is requested to create and start a daemon module.
Worker¶
PythonWorkerFactory uses spark.python.worker.module configuration property to specify the Python Worker Module.
The Python Worker Module is used when PythonWorkerFactory is requested to create and start a worker.
Creating Python Worker¶
create(): (Socket, Option[Int])
create branches off based on the useDaemon flag:
- When enabled,
createfirstly checks the idleWorkers queue and returns one if available. Otherwise,createcreateThroughDaemon - When disabled,
createcreateSimpleWorker
create is used when:
SparkEnvis requested to createPythonWorker
Creating Daemon Worker¶
createThroughDaemon(): (Socket, Option[Int])
createThroughDaemon startDaemon followed by createSocket.
In case of a SocketException, createThroughDaemon prints out the following WARN message to the logs:
Failed to open socket to Python daemon: [exception]
Assuming that daemon unexpectedly quit, attempting to restart
And then, createThroughDaemon stopDaemon, startDaemon and createSocket.
createSocket¶
createSocket(): (Socket, Option[Int])
createSocket creates a new stream socket and connects it to the daemonPort at the daemonHost.
createSocket reads the PID (of the python worker behind the stream socket) and requests the authHelper to authToServer.
In the end, createSocket returns the socket and the PID (after registering them in the daemonWorkers registry).
Starting Python Daemon Process¶
startDaemon(): Unit
Does nothing with daemon initialized
startDaemon does nothing when daemon is initialized (non-null) that indicates that the daemon is already up and running.
startDaemon creates the command (using the given pythonExec and the daemon module):
[pythonExec] -m [daemonModule]
startDaemon adds the given envVars and the following (extra) environment variables to the environment of future python processes:
| Environment Variable | Value |
|---|---|
PYTHONPATH | pythonPath |
PYTHON_WORKER_FACTORY_SECRET | authHelper |
SPARK_PREFER_IPV6 | True if the underlying JVM prefer IPv6 addresses (based on java.net.preferIPv6Addresses JVM property) |
PYTHONUNBUFFERED | YES |
startDaemon starts a new process (that is known as the daemon).
startDaemon connects to the python process to read the daemonPort.
In the end, startDaemon redirectStreamsToStderr.
Creating Simple Non-Daemon Worker¶
createSimpleWorker(): Socket
createSimpleWorker...FIXME
createSimpleWorker is used when PythonWorkerFactory is requested to create a Python worker (with useDaemon flag disabled).
Logging¶
Enable ALL logging level for org.apache.spark.api.python.PythonWorkerFactory logger to see what happens inside.
Add the following line to conf/log4j2.properties:
logger.PythonWorkerFactory.name = org.apache.spark.api.python.PythonWorkerFactory
logger.PythonWorkerFactory.level = all
Refer to Logging.