Run spacy jobs on Apache Spark

Spacy is a state-of-the-art NLP library in Python, which provides a lot of tools that required in NLP problems, eg; NER, unicode tokenizers, Deep learning models for Tagging, NER and related operations.

Spark provides only traditional NLP tools like standard tokenizers, tf-idf,etc, we mostly need accurate POS tagging and chunking features when working with NLP problems, which spark libraries aren’t close to spacy. Those cases we need to relay on spacy.

Pyspark architecture

A quick brief about the pyspark architecture, Bellow image shows that the workers spawn python process to run the pyspark jobs; which are written in python and all necessary python ML libraries like sklearn, numpy, spacy etc.

Pyspark Architecture

The spark worker doesn’t control much other than starting the python worker process and control whether python worker need to be restarted on every job or not.

Main challenges when using spacy models with pyspark

  1. How to skip spacy model serialization

  2. Spacy’s inbuilt multi-processing feature may bite you.

  3. How we can manage the worker process management.

Usually when we submit spark jobs to the spark driver compiles it and optimize the pipeline. The final plan of the pipeline is split across the executors based on the DAG of data flow defined on the pipeline. Here the spark executors does the actual work, where the driver program sends out the relevant codes to executes at executor side. This is being done by serializing the relevant parts of the pipeline.

One thing to ensure is our program is serializable ( Source code, classes and objects ). Otherwise spark fails to execute the pipeline.

How to skip spacy model serialization

How we ensure this is by avoiding the scenario of serializing the spacy’s inbuilt trained binary models. How we do that ?

Pyspark uses PickleSerializer to serialize the python objects, but spacy models aren’t serialisable using PickleSerializer which is trigger the issue when we load the spacy model first and then refer in worker code.

This code will fail

Here we simply loading the spacy object at driver side itself, when the python modules are getting loaded, which demands the pickling for the spacy objects. Which eventually fails when ship it to the worker side.

SPACY_MODEL = spacy.load("en_core_web_lg")

Working Version

Here we are wrapping the spacy model under a lazy function, which will ensure the model won’t get loaded until it’s really required — which is actually required when the executor runs this code with partitioned dataset.

# Here we are not loading the model at the loading time, only the worker code
# will invoke this routine and gets the spacy object. Which means we are loading
# new spacy models on every executors.
SPACY_MODEL = None
def get_spacy_model():
    global SPACY_MODEL
    if not SPACY_MODEL:
       _model = spacy.load("en_core_web_lg")
    SPACY_MODEL = _model
    return SPACY_MODEL

At driver side we won’t load the spacy model, instead ensure they are loaded lazily at executor side.

Here the models can’t be serialised at the driver side and ship it to worker and load it back, So we need to ensure only at the runtime the models are really gets loaded into the worker memory.

Full version of using spacy on spark

ubuntu@ip-10-50-168-159:~$
import os
import spacy
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType, ArrayType


SPACY_MODEL = None
def get_spacy_model():
    global SPACY_MODEL
    if not SPACY_MODEL:
       _model = spacy.load("en_core_web_lg")
       # FIX https://github.com/explosion/spaCy/issues/922
       _model.vocab.add_flag(
           lambda s: s.lower() in spacy.lang.en.stop_words.STOP_WORDS,
           spacy.attrs.IS_STOP
       )

    SPACY_MODEL = _model
    return SPACY_MODEL


os.environ["PYSPARK_SUBMIT_ARGS"]  = "--master local[*] \
        --executor-cores 1 \
        --executor-memory 5g \
        --driver-memory 5g \
        pyspark-shell"

os.environ["SPARK_HOME"] = "/home/ubuntu/spark-2.4.0-bin-hadoop2.7"
os.environ["PYSPARK_PYTHON"] = "/home/ubuntu/ENV3/bin/python"

spark = SparkSession \
        .builder \
        .appName("test-spacy-on-spark") \
        .getOrCreate()

# Enable the arrow based udf calls or data transfer between python and jvm.
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.python.worker.reuse", "true")
spark.conf.set("spark.python.worker.memory", "2g")
#
# Simple UDF function which uses the spacy model to evaluate your create
#
@pandas_udf(returnType=ArrayType(StringType()), functionType=PandasUDFType.SCALAR)
def tokenize_and_clean(documents):
    spacy_model = get_spacy_model()
    docs = spacy_model.pipe(documents)
    tokens = [[tok.lemma_ for tok in doc if not tok.is_stop and tok.text]
              for doc in docs]
    tokens_series = pd.Series(tokens)
    return tokens_series


data = spark.read.option("header", True).csv("/mnt/input/dataset.csv")

#import pdb; pdb.set_trace()
print(data.printSchema())
data = data.repartition(5)
data1 = data.withColumn("tokens", tokenize_and_clean("abstracts"))


print(data1.select("tokens").show())

Spacy multi-processing capabilities

This feature included with spacy to speed up the pipeline processing and making use of multiple core available on the machine. If you are not careful with this configuration then spark executors won’t control the python daemon behavior of forking processes internally, which leads into over utilization of resource, and low throughput.

Bellow code ensures the spacy will dispatch the different documents into available cores to finish the spacy pipeline operations.

nlp = spacy.load("en_core_web_lg")

docs = nlp.pipe(raw_docs)

If you are enabling this, then your spark configuration shouldn’t control the worker cores, instead each worker/executor uses only 1 core and leave the remaining cores for python workers, which is an good option here.

master

./sbin/start-master

slave 1

./sbin/start-slave.sh -c 1 -m 5g spark://<master-hostname>:7077
Important
Here we are setting spark worker to use only one CPU, this means spark can launch one executor with 1 CPU, as with spacy workload main computation happening at python side, and spacy brings the multiprocessing outside the spark framework.

Check the python processes ran by each spark worker

On a 8 core machine, above standalone cluster configuration,

$ pstree -aup | grep pyspark
      |       |   |-python,32602 -m pyspark.daemon
      |       |   |   |-python,32608 -m pyspark.daemon
      |       |   |   |-python,32609 -m pyspark.daemon
      |       |   |   |-python,32614 -m pyspark.daemon
      |       |   |   `-python,32616 -m pyspark.daemon
      |       |   |   |-python,32601 -m pyspark.daemon
      |       |   |   |-python,32607 -m pyspark.daemon
      |       |   |   |-python,32612 -m pyspark.daemon
      |       |   |   `-python,32615 -m pyspark.daemon
      |   |-grep,1487 --color=auto pyspark

PID 32602 -> The master python job which interact with the spark executor to fetch data
Other PIDs are the spacy workers launched, default behavior is one worker per cpu core.

Suppose we ran the apache spark worker with 8 core, and allocated 1 CPU for each executor, then it will fork 8x8 = 64 python processes to do the task, in place of 8 process. Which will degrade pipeline performance.

For Yarn or Kubernetes cluster manager this problem won’t happen as both will restrict the system view to application restricted — similar to VMs; with the help of Control Group (cgroup) and namespace features. So the spark executor or the python worker won’t see entire CPU / RAM for utilization, they gets it by the allocation specified based on the container spec on both Yarn and Kubernetes environment.

Takeaway

  1. Ensure you are writing spark pipeline with serialisable objects, or do lazy evaluation.

  2. Be careful when using the external libraries like spacy, which may bring its own multiprocessing feature, which will result in overloading the system with spark executor configuration.

  3. Use the different cluster manager other than standalone one to get more control over allocating resources to the executors.

Go Top