Skip to content

PythonWorkerFactory

PythonWorkerFactory is a factory of Python workers to execute PythonFunctions.

PythonWorkerFactory

Note

There could be many PythonWorkerFactorys on a single executor (one for every pair of the pythonExec and the envVars).

Creating Instance

PythonWorkerFactory takes the following to be created:

PythonWorkerFactory is created when:

Python Executable

PythonWorkerFactory is given a Python executable (pythonExec) when created.

The Python executable is the pythonExec of the first PythonFunction (of all the Python UDFs to execute by BasePythonRunner).

Note

It is assumed that all PythonFunctions (of a BasePythonRunner) should have the same Python executable, version and env vars. That is why it is safe to use the first PythonFunction.

useDaemon

PythonWorkerFactory initializes useDaemon internal flag when created.

useDaemon is enabled when the following all hold:

  • spark.python.use.daemon is enabled
  • The operating system is not MS Windows (based on os.name JVM property) as it works on UNIX-based systems only (because it uses signals for child management)

useDaemon flag is used when PythonWorkerFactory is requested for the following:

Daemon Process

daemon: Process = null

daemon is a Process (Java) to control Python worker processes.

daemon is uninitialized (null) right after PythonWorkerFactory is created and right after stopDaemon.

daemon is initialized and immediately started when startDaemon (and listens at daemonPort).

daemon is alive until stopDaemon.

Any communication with the daemon happens through daemonPort.

Port

daemonPort: Int = 0

daemonPort is the communication channel (port) of the daemon Python process (that is known only after startDaemon).

daemonPort (alongside the daemonHost) is used to open a socket stream and launch workers.

Python Workers

daemonWorkers: mutable.WeakHashMap[Socket, Int]

PythonWorkerFactory creates daemonWorkers internal registry of socket streams and the worker's PID when created.

A new pair is added in createSocket (when createThroughDaemon).

daemonWorkers is used when:

Python Modules

Daemon

PythonWorkerFactory initializes daemonModule internal property for the Python Daemon Module when created.

daemonModule is the value of spark.python.daemon.module configuration property.

The Python Daemon Module is used when PythonWorkerFactory is requested to create and start a daemon module.

Worker

PythonWorkerFactory uses spark.python.worker.module configuration property to specify the Python Worker Module.

The Python Worker Module is used when PythonWorkerFactory is requested to create and start a worker.

Creating Python Worker

create(): (Socket, Option[Int])

create branches off based on the useDaemon flag:


create is used when:

Creating Daemon Worker

createThroughDaemon(): (Socket, Option[Int])

createThroughDaemon startDaemon followed by createSocket.

In case of a SocketException, createThroughDaemon prints out the following WARN message to the logs:

Failed to open socket to Python daemon: [exception]
Assuming that daemon unexpectedly quit, attempting to restart

And then, createThroughDaemon stopDaemon, startDaemon and createSocket.

createSocket

createSocket(): (Socket, Option[Int])

createSocket creates a new stream socket and connects it to the daemonPort at the daemonHost.

createSocket reads the PID (of the python worker behind the stream socket) and requests the authHelper to authToServer.

In the end, createSocket returns the socket and the PID (after registering them in the daemonWorkers registry).

Starting Python Daemon Process

startDaemon(): Unit

Does nothing with daemon initialized

startDaemon does nothing when daemon is initialized (non-null) that indicates that the daemon is already up and running.

startDaemon creates the command (using the given pythonExec and the daemon module):

[pythonExec] -m [daemonModule]

startDaemon adds the given envVars and the following (extra) environment variables to the environment of future python processes:

Environment Variable Value
PYTHONPATH pythonPath
PYTHON_WORKER_FACTORY_SECRET authHelper
SPARK_PREFER_IPV6 True if the underlying JVM prefer IPv6 addresses (based on java.net.preferIPv6Addresses JVM property)
PYTHONUNBUFFERED YES

startDaemon starts a new process (that is known as the daemon).

startDaemon connects to the python process to read the daemonPort.

In the end, startDaemon redirectStreamsToStderr.

Creating Simple Non-Daemon Worker

createSimpleWorker(): Socket

createSimpleWorker...FIXME

createSimpleWorker is used when PythonWorkerFactory is requested to create a Python worker (with useDaemon flag disabled).

Logging

Enable ALL logging level for org.apache.spark.api.python.PythonWorkerFactory logger to see what happens inside.

Add the following line to conf/log4j2.properties:

logger.PythonWorkerFactory.name = org.apache.spark.api.python.PythonWorkerFactory
logger.PythonWorkerFactory.level = all

Refer to Logging.