Skip to content

BasePythonRunner

BasePythonRunner is an abstraction of Python Runners.

Contract

newReaderIterator

newReaderIterator(
  stream: DataInputStream,
  writerThread: WriterThread,
  startTime: Long,
  env: SparkEnv,
  worker: Socket,
  releasedOrClosed: AtomicBoolean,
  context: TaskContext): Iterator[OUT]

Used when BasePythonRunner is requested to compute

newWriterThread

newWriterThread(
  env: SparkEnv,
  worker: Socket,
  inputIterator: Iterator[IN],
  partitionIndex: Int,
  context: TaskContext): WriterThread

Used when BasePythonRunner is requested to compute

Implementations

Scala Definition

BasePythonRunner is a type constructor in Scala (generic class in Java) with the following definition:

abstract class BasePythonRunner[IN, OUT](...) {
    // ...
}

BasePythonRunner uses IN and OUT as the name of the types for the input and output values.

Creating Instance

BasePythonRunner takes the following to be created:

  • ChainedPythonFunctions
  • Eval Type
  • Argument Offsets

BasePythonRunner requires that the number of ChainedPythonFunctions and Argument Offsets are the same.

Abstract Class

BasePythonRunner is an abstract class and cannot be created directly. It is created indirectly for the concrete BasePythonRunners.

Computing Result

compute(
  inputIterator: Iterator[IN],
  partitionIndex: Int,
  context: TaskContext): Iterator[OUT]

compute makes sure that spark.executorEnv.OMP_NUM_THREADS configuration option is set or defaults to spark.executor.cores property.

compute defines the following environment variables:

  • SPARK_LOCAL_DIRS to be the local directories of the local DiskBlockManager
  • SPARK_BUFFER_SIZE to be the value of spark.buffer.size configuration property (default: 65536)

compute can optionally define environment variables:

  • SPARK_REUSE_WORKER to be 1 based on spark.python.worker.reuse configuration property (default: true)
  • PYSPARK_EXECUTOR_MEMORY_MB to be the value of spark.executor.pyspark.memory configuration property if defined

compute requests the executor's SparkEnv to createPythonWorker (for a pythonExec and the environment variables) that requests PythonWorkerFactory to create a Python worker (and give a java.net.Socket).

FIXME

Describe pythonExec.

compute [newWriterThread] with the Python worker and the input arguments.

compute creates and starts a MonitorThread to watch the Python worker.

compute creates a new reader iterator to read lines from the Python worker's stdout.

compute is used when:

  • PythonRDD is requested to compute
  • AggregateInPandasExec, ArrowEvalPythonExec, BatchEvalPythonExec, FlatMapCoGroupsInPandasExec, FlatMapGroupsInPandasExec MapInPandasExec, WindowInPandasExec physical operators are executed

Last update: 2021-02-25