Spacy is a state-of-the-art NLP library in Python, which provides a lot of tools that required in NLP problems, eg; NER, unicode tokenizers, Deep learning models for Tagging, NER and related operations.
Spark provides only traditional NLP tools like standard tokenizers, tf-idf,etc, we mostly need accurate POS tagging and chunking features when working with NLP problems, which spark libraries aren’t close to spacy. Those cases we need to relay on spacy.
A quick brief about the pyspark architecture, Bellow image shows that the workers spawn python process to run the pyspark jobs; which are written in python and all necessary python ML libraries like sklearn, numpy, spacy etc.
The spark worker doesn’t control much other than starting the python worker process and control whether python worker need to be restarted on every job or not.
Main challenges when using spacy models with pyspark
How to skip spacy model serialization
Spacy’s inbuilt multi-processing feature may bite you.
How we can manage the worker process management.
Usually when we submit spark jobs to the spark
driver compiles it and optimize the
pipeline. The final plan of the pipeline is split across the
executors based on
the DAG of data flow defined on the pipeline. Here the spark executors does the
actual work, where the driver program sends out the relevant codes to executes
at executor side. This is being done by serializing the relevant parts of the
One thing to ensure is our program is serializable ( Source code, classes and objects ). Otherwise spark fails to execute the pipeline.
How to skip spacy model serialization
How we ensure this is by avoiding the scenario of serializing the spacy’s inbuilt trained binary models. How we do that ?
PickleSerializer to serialize the python objects, but spacy models
aren’t serialisable using
PickleSerializer which is trigger the issue when we
load the spacy model first and then refer in worker code.
This code will fail
Here we simply loading the spacy object at driver side itself, when the python modules are getting loaded, which demands the pickling for the spacy objects. Which eventually fails when ship it to the worker side.
SPACY_MODEL = spacy.load("en_core_web_lg")
Here we are wrapping the spacy model under a lazy function, which will ensure the model won’t get loaded until it’s really required — which is actually required when the executor runs this code with partitioned dataset.
# Here we are not loading the model at the loading time, only the worker code # will invoke this routine and gets the spacy object. Which means we are loading # new spacy models on every executors. SPACY_MODEL = None def get_spacy_model(): global SPACY_MODEL if not SPACY_MODEL: _model = spacy.load("en_core_web_lg") SPACY_MODEL = _model return SPACY_MODEL
At driver side we won’t load the spacy model, instead ensure they are loaded lazily at executor side.
Here the models can’t be serialised at the driver side and ship it to worker and load it back, So we need to ensure only at the runtime the models are really gets loaded into the worker memory.
Full version of using spacy on spark
ubuntu@ip-10-50-168-159:~$ import os import spacy import pandas as pd from pyspark.sql import SparkSession from pyspark.sql.functions import pandas_udf, PandasUDFType from pyspark.sql.types import StringType, ArrayType SPACY_MODEL = None def get_spacy_model(): global SPACY_MODEL if not SPACY_MODEL: _model = spacy.load("en_core_web_lg") # FIX https://github.com/explosion/spaCy/issues/922 _model.vocab.add_flag( lambda s: s.lower() in spacy.lang.en.stop_words.STOP_WORDS, spacy.attrs.IS_STOP ) SPACY_MODEL = _model return SPACY_MODEL os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] \ --executor-cores 1 \ --executor-memory 5g \ --driver-memory 5g \ pyspark-shell" os.environ["SPARK_HOME"] = "/home/ubuntu/spark-2.4.0-bin-hadoop2.7" os.environ["PYSPARK_PYTHON"] = "/home/ubuntu/ENV3/bin/python" spark = SparkSession \ .builder \ .appName("test-spacy-on-spark") \ .getOrCreate() # Enable the arrow based udf calls or data transfer between python and jvm. spark.conf.set("spark.sql.execution.arrow.enabled", "true") spark.conf.set("spark.python.worker.reuse", "true") spark.conf.set("spark.python.worker.memory", "2g") # # Simple UDF function which uses the spacy model to evaluate your create # @pandas_udf(returnType=ArrayType(StringType()), functionType=PandasUDFType.SCALAR) def tokenize_and_clean(documents): spacy_model = get_spacy_model() docs = spacy_model.pipe(documents) tokens = [[tok.lemma_ for tok in doc if not tok.is_stop and tok.text] for doc in docs] tokens_series = pd.Series(tokens) return tokens_series data = spark.read.option("header", True).csv("/mnt/input/dataset.csv") #import pdb; pdb.set_trace() print(data.printSchema()) data = data.repartition(5) data1 = data.withColumn("tokens", tokenize_and_clean("abstracts")) print(data1.select("tokens").show())
Spacy multi-processing capabilities
This feature included with spacy to speed up the pipeline processing and making use of multiple core available on the machine. If you are not careful with this configuration then spark executors won’t control the python daemon behavior of forking processes internally, which leads into over utilization of resource, and low throughput.
Bellow code ensures the spacy will dispatch the different documents into available cores to finish the spacy pipeline operations.
nlp = spacy.load("en_core_web_lg") docs = nlp.pipe(raw_docs)
If you are enabling this, then your spark configuration shouldn’t control the worker cores, instead each worker/executor uses only 1 core and leave the remaining cores for python workers, which is an good option here.
./sbin/start-slave.sh -c 1 -m 5g spark://<master-hostname>:7077
|Here we are setting spark worker to use only one CPU, this means spark can launch one executor with 1 CPU, as with spacy workload main computation happening at python side, and spacy brings the multiprocessing outside the spark framework.|
Check the python processes ran by each spark worker
On a 8 core machine, above standalone cluster configuration, $ pstree -aup | grep pyspark | | |-python,32602 -m pyspark.daemon | | | |-python,32608 -m pyspark.daemon | | | |-python,32609 -m pyspark.daemon | | | |-python,32614 -m pyspark.daemon | | | `-python,32616 -m pyspark.daemon | | | |-python,32601 -m pyspark.daemon | | | |-python,32607 -m pyspark.daemon | | | |-python,32612 -m pyspark.daemon | | | `-python,32615 -m pyspark.daemon | |-grep,1487 --color=auto pyspark PID 32602 -> The master python job which interact with the spark executor to fetch data Other PIDs are the spacy workers launched, default behavior is one worker per cpu core.
Suppose we ran the apache spark worker with 8 core, and allocated 1 CPU for each executor, then it will fork 8x8 = 64 python processes to do the task, in place of 8 process. Which will degrade pipeline performance.
Kubernetes cluster manager this problem won’t happen as both
will restrict the system view to application restricted — similar to VMs; with the
help of Control Group (
namespace features. So the spark executor
or the python worker won’t see entire CPU / RAM for utilization, they gets it by
the allocation specified based on the container spec on both Yarn and Kubernetes
Ensure you are writing spark pipeline with serialisable objects, or do lazy evaluation.
Be careful when using the external libraries like spacy, which may bring its own multiprocessing feature, which will result in overloading the system with spark executor configuration.
Use the different cluster manager other than standalone one to get more control over allocating resources to the executors.