Skip to content

BasePythonRunner

BasePythonRunner is an abstraction of Python Runners.

BasePythonRunner is executed as part of Spark tasks (that run on executors).

Scala Definition

BasePythonRunner is a type constructor in Scala (generic class in Java) with the following definition:

abstract class BasePythonRunner[IN, OUT](...) {
    // ...
}

BasePythonRunner uses IN and OUT as the name of the types for the input and output values.

Contract

newReaderIterator

newReaderIterator(
  stream: DataInputStream,
  writerThread: WriterThread,
  startTime: Long,
  env: SparkEnv,
  worker: Socket,
  pid: Option[Int],
  releasedOrClosed: AtomicBoolean,
  context: TaskContext): Iterator[OUT]

See:

Used when:

  • BasePythonRunner is requested to compute

newWriterThread

newWriterThread(
  env: SparkEnv,
  worker: Socket,
  inputIterator: Iterator[IN],
  partitionIndex: Int,
  context: TaskContext): WriterThread

See:

Used when:

  • BasePythonRunner is requested to compute

Implementations

Creating Instance

BasePythonRunner takes the following to be created:

  • ChainedPythonFunctions
  • Eval Type
  • Argument Offsets

BasePythonRunner requires that the number of ChainedPythonFunctions and Argument Offsets are the same.

Abstract Class

BasePythonRunner is an abstract class and cannot be created directly. It is created indirectly for the concrete BasePythonRunners.

accumulator

accumulator: PythonAccumulatorV2

BasePythonRunner initializes a registry of a PythonAccumulatorV2 when created to be the accumulator of the head PythonFunction among the given ChainedPythonFunctions.

The PythonAccumulatorV2 is used when ReaderIterator is requested to handleEndOfDataSection (to update metrics).

Computing Result

compute(
  inputIterator: Iterator[IN],
  partitionIndex: Int,
  context: TaskContext): Iterator[OUT]

Runs on Executors

compute runs on Spark executors.

compute uses the given TaskContext to look up the following local properties (if they were specified via ResourceProfile):

  • resource.executor.cores
  • resource.pyspark.memory

compute requests the DiskBlockManager for the local directories and creates a comma-separated list of them (localdir).

Unless spark.executorEnv.OMP_NUM_THREADS is explicitly specified (in the SparkConf), compute sets OMP_NUM_THREADS (in the envVars) to be the value ofresource.executor.cores (if defined).

compute sets the following in the envVars:

  • SPARK_LOCAL_DIRS as the local directories of the local DiskBlockManager (localdir)

compute can optionally define environment variables:

  • SPARK_REUSE_WORKER as 1 when spark.python.worker.reuse configuration property is enabled
  • SPARK_SIMPLIFIED_TRACEBACK as 1 when simplifiedTraceback is enabled
  • others

compute requests SparkEnv to createPythonWorker (for the pythonExec and the envVars).

compute creates a new WriterThread (to feed the worker process input from the given inputIterator) and starts it.

compute creates and starts a WriterMonitorThread.

compute creates a MonitorThread.

compute creates a buffered DataInputStream to read from the worker (socket) output. compute uses the bufferSize.

In the end, compute creates a new ReaderIterator to read lines from the Python worker's stdout (from the buffered DataInputStream).


compute is used when: