BasePythonRunner¶
BasePythonRunner is an abstraction of Python Runners.
BasePythonRunner is executed as part of Spark tasks (that run on executors).
Scala Definition
BasePythonRunner is a type constructor in Scala (generic class in Java) with the following definition:
abstract class BasePythonRunner[IN, OUT](...) {
// ...
}
BasePythonRunner uses IN and OUT as the name of the types for the input and output values.
Contract¶
newReaderIterator¶
newReaderIterator(
stream: DataInputStream,
writerThread: WriterThread,
startTime: Long,
env: SparkEnv,
worker: Socket,
pid: Option[Int],
releasedOrClosed: AtomicBoolean,
context: TaskContext): Iterator[OUT]
See:
Used when:
BasePythonRunneris requested to compute
newWriterThread¶
newWriterThread(
env: SparkEnv,
worker: Socket,
inputIterator: Iterator[IN],
partitionIndex: Int,
context: TaskContext): WriterThread
See:
Used when:
BasePythonRunneris requested to compute
Implementations¶
ApplyInPandasWithStatePythonRunner- ArrowPythonRunner
CoGroupedArrowPythonRunner- PythonRunner
- PythonUDFRunner
Creating Instance¶
BasePythonRunner takes the following to be created:
-
ChainedPythonFunctions - Eval Type
- Argument Offsets
BasePythonRunner requires that the number of ChainedPythonFunctions and Argument Offsets are the same.
Abstract Class
BasePythonRunner is an abstract class and cannot be created directly. It is created indirectly for the concrete BasePythonRunners.
accumulator¶
accumulator: PythonAccumulatorV2
BasePythonRunner initializes a registry of a PythonAccumulatorV2 when created to be the accumulator of the head PythonFunction among the given ChainedPythonFunctions.
The PythonAccumulatorV2 is used when ReaderIterator is requested to handleEndOfDataSection (to update metrics).
Computing Result¶
compute(
inputIterator: Iterator[IN],
partitionIndex: Int,
context: TaskContext): Iterator[OUT]
Runs on Executors
compute runs on Spark executors.
compute uses the given TaskContext to look up the following local properties (if they were specified via ResourceProfile):
resource.executor.coresresource.pyspark.memory
compute requests the DiskBlockManager for the local directories and creates a comma-separated list of them (localdir).
Unless spark.executorEnv.OMP_NUM_THREADS is explicitly specified (in the SparkConf), compute sets OMP_NUM_THREADS (in the envVars) to be the value ofresource.executor.cores (if defined).
compute sets the following in the envVars:
SPARK_LOCAL_DIRSas the local directories of the localDiskBlockManager(localdir)
compute can optionally define environment variables:
SPARK_REUSE_WORKERas1whenspark.python.worker.reuseconfiguration property is enabledSPARK_SIMPLIFIED_TRACEBACKas1when simplifiedTraceback is enabled- others
compute requests SparkEnv to createPythonWorker (for the pythonExec and the envVars).
compute creates a new WriterThread (to feed the worker process input from the given inputIterator) and starts it.
compute creates and starts a WriterMonitorThread.
compute creates a MonitorThread.
compute creates a buffered DataInputStream to read from the worker (socket) output. compute uses the bufferSize.
In the end, compute creates a new ReaderIterator to read lines from the Python worker's stdout (from the buffered DataInputStream).
compute is used when:
PythonRDDis requested to compute a partition- AggregateInPandasExec, ArrowEvalPythonExec,
BatchEvalPythonExec,FlatMapCoGroupsInPandasExec,FlatMapGroupsInPandasExecMapInPandasExec,WindowInPandasExecphysical operators are executed PandasGroupUtilsis requested toexecutePythonPythonForeachWriteris requested for the outputIterator