BasePythonRunner¶
BasePythonRunner
is an abstraction of Python Runners.
BasePythonRunner
is executed as part of Spark tasks (that run on executors).
Scala Definition
BasePythonRunner
is a type constructor in Scala (generic class in Java) with the following definition:
abstract class BasePythonRunner[IN, OUT](...) {
// ...
}
BasePythonRunner
uses IN
and OUT
as the name of the types for the input and output values.
Contract¶
newReaderIterator¶
newReaderIterator(
stream: DataInputStream,
writerThread: WriterThread,
startTime: Long,
env: SparkEnv,
worker: Socket,
pid: Option[Int],
releasedOrClosed: AtomicBoolean,
context: TaskContext): Iterator[OUT]
See:
Used when:
BasePythonRunner
is requested to compute
newWriterThread¶
newWriterThread(
env: SparkEnv,
worker: Socket,
inputIterator: Iterator[IN],
partitionIndex: Int,
context: TaskContext): WriterThread
See:
Used when:
BasePythonRunner
is requested to compute
Implementations¶
ApplyInPandasWithStatePythonRunner
- ArrowPythonRunner
CoGroupedArrowPythonRunner
- PythonRunner
- PythonUDFRunner
Creating Instance¶
BasePythonRunner
takes the following to be created:
-
ChainedPythonFunctions
- Eval Type
- Argument Offsets
BasePythonRunner
requires that the number of ChainedPythonFunctions and Argument Offsets are the same.
Abstract Class
BasePythonRunner
is an abstract class and cannot be created directly. It is created indirectly for the concrete BasePythonRunners.
accumulator¶
accumulator: PythonAccumulatorV2
BasePythonRunner
initializes a registry of a PythonAccumulatorV2 when created to be the accumulator of the head PythonFunction among the given ChainedPythonFunctions.
The PythonAccumulatorV2
is used when ReaderIterator
is requested to handleEndOfDataSection (to update metrics).
Computing Result¶
compute(
inputIterator: Iterator[IN],
partitionIndex: Int,
context: TaskContext): Iterator[OUT]
Runs on Executors
compute
runs on Spark executors.
compute
uses the given TaskContext
to look up the following local properties (if they were specified via ResourceProfile
):
resource.executor.cores
resource.pyspark.memory
compute
requests the DiskBlockManager
for the local directories and creates a comma-separated list of them (localdir
).
Unless spark.executorEnv.OMP_NUM_THREADS
is explicitly specified (in the SparkConf), compute
sets OMP_NUM_THREADS
(in the envVars) to be the value ofresource.executor.cores
(if defined).
compute
sets the following in the envVars:
SPARK_LOCAL_DIRS
as the local directories of the localDiskBlockManager
(localdir
)
compute
can optionally define environment variables:
SPARK_REUSE_WORKER
as1
whenspark.python.worker.reuse
configuration property is enabledSPARK_SIMPLIFIED_TRACEBACK
as1
when simplifiedTraceback is enabled- others
compute
requests SparkEnv
to createPythonWorker (for the pythonExec and the envVars).
compute
creates a new WriterThread (to feed the worker process input from the given inputIterator
) and starts it.
compute
creates and starts a WriterMonitorThread
.
compute
creates a MonitorThread
.
compute
creates a buffered DataInputStream
to read from the worker (socket) output. compute
uses the bufferSize.
In the end, compute
creates a new ReaderIterator to read lines from the Python worker's stdout (from the buffered DataInputStream
).
compute
is used when:
PythonRDD
is requested to compute a partition- AggregateInPandasExec, ArrowEvalPythonExec,
BatchEvalPythonExec
,FlatMapCoGroupsInPandasExec
,FlatMapGroupsInPandasExec
MapInPandasExec
,WindowInPandasExec
physical operators are executed PandasGroupUtils
is requested toexecutePython
PythonForeachWriter
is requested for the outputIterator