Run spacy jobs on Apache Spark
Tweetin data-science · Thu 16 May 2019
in data-science · Thu 16 May 2019
Spacy is a state-of-the-art NLP library in Python, which provides a lot of tools that required in NLP problems, eg; NER, unicode tokenizers, Deep learning models for Tagging, NER, and related operations.
Spark provides only traditional NLP tools like standard tokenizers, tf-idf, etc, we mostly need accurate POS tagging and chunking features when working with NLP problems, which spark libraries aren’t close to spacy. In those cases, we need to rely on spacy.
A quick brief about the pyspark architecture, Bellow image shows that the workers spawn python process to run the pyspark jobs; which are written in python and all necessary python ML libraries like sklearn, numpy, spacy, etc.
The spark worker doesn’t control much other than starting the python worker process and control whether python worker need to be restarted on every job or not.
How to skip spacy model serialization
Spacy’s inbuilt multi-processing feature may bite you.
How we can manage the worker process management.
Usually when we submit spark jobs to the spark driver
compiles it and optimizes the
pipeline. The final plan of the pipeline is split across the executors
based on
the DAG of data flow defined on the pipeline. Here the spark executors do the
actual work, where the driver program sends out the relevant codes to executes
at the executor side. This is being done by serializing the relevant parts of the
pipeline.
One thing to ensure is our program is serializable ( Source code, classes, and objects ). Otherwise, the spark fails to execute the pipeline.
How we ensure this is by avoiding the scenario of serializing the spacy’s inbuilt trained binary models. How we do that?
Pyspark uses PickleSerializer
to serialize the python objects, but spacy models
aren’t serializable using PickleSerializer
which is trigger the issue when we
load the spacy model first and then refer to worker code.
Here we simply loading the spacy object at the driver side itself, when the python modules are getting loaded, which demands the pickling for the spacy objects. Which eventually fails when shipping it to the worker side.
SPACY_MODEL = spacy.load("en_core_web_lg")
Here we are wrapping the spacy model under a lazy function, which will ensure the model won’t get loaded until it’s really required — which is actually required when the executor runs this code with a partitioned dataset.
# Here we are not loading the model at the loading time, only the worker code
# will invoke this routine and gets the spacy object. Which means we are loading
# new spacy models on every executor.
SPACY_MODEL = None
def get_spacy_model():
global SPACY_MODEL
if not SPACY_MODEL:
_model = spacy.load("en_core_web_lg")
SPACY_MODEL = _model
return SPACY_MODEL
On the driver side, we won’t load the spacy model, instead ensure they are loaded lazily at executor side.
Here the models can’t be serialized at the driver side and ship it to work and load it back, So we need to ensure only at the runtime the models are really gets loaded into the working memory.
import os
import spacy
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StringType, ArrayType
SPACY_MODEL = None
def get_spacy_model():
global SPACY_MODEL
if not SPACY_MODEL:
_model = spacy.load("en_core_web_lg")
# FIX https://github.com/explosion/spaCy/issues/922
_model.vocab.add_flag(
lambda s: s.lower() in spacy.lang.en.stop_words.STOP_WORDS,
spacy.attrs.IS_STOP
)
SPACY_MODEL = _model
return SPACY_MODEL
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local[*] \
--executor-cores 1 \
--executor-memory 5g \
--driver-memory 5g \
pyspark-shell"
os.environ["SPARK_HOME"] = "/home/ubuntu/spark-2.4.0-bin-hadoop2.7"
os.environ["PYSPARK_PYTHON"] = "/home/ubuntu/ENV3/bin/python"
spark = SparkSession \
.builder \
.appName("test-spacy-on-spark") \
.getOrCreate()
# Enable the arrow based udf calls or data transfer between python and jvm.
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.python.worker.reuse", "true")
spark.conf.set("spark.python.worker.memory", "2g")
#
# Simple UDF function which uses the spacy model to evaluate your create
#
@pandas_udf(returnType=ArrayType(StringType()), functionType=PandasUDFType.SCALAR)
def tokenize_and_clean(documents):
spacy_model = get_spacy_model()
docs = spacy_model.pipe(documents)
tokens = [[tok.lemma_ for tok in doc if not tok.is_stop and tok.text]
for doc in docs]
tokens_series = pd.Series(tokens)
return tokens_series
data = spark.read.option("header", True).csv("/mnt/input/dataset.csv")
#import pdb; pdb.set_trace()
print(data.printSchema())
data = data.repartition(5)
data1 = data.withColumn("tokens", tokenize_and_clean("abstracts"))
print(data1.select("tokens").show())
This feature included with spacy to speed up the pipeline processing and making use of multiple cores available on the machine. If you are not careful with this configuration then spark executors won’t control the python daemon behavior of forking processes internally, which leads to overutilization of resource, and low throughput.
Bellow code ensures the spacy will dispatch the different documents into available cores to finish the spacy pipeline operations.
nlp = spacy.load("en_core_web_lg")
docs = nlp.pipe(raw_docs)
If you are enabling this, then your spark configuration shouldn’t control the worker cores, instead each worker/executor uses only 1 core and leave the remaining cores for python workers, which is a good option here.
./sbin/start-master
./sbin/start-slave.sh -c 1 -m 5g spark://<master-hostname>:7077
Important
|
Here we are setting spark worker to use only one CPU, this means spark can launch one executor with 1 CPU, as with spacy workload main computation happening at python side, and spacy brings the multiprocessing outside the spark framework. |
On a 8 core machine, above standalone cluster configuration,
$ pstree -aup | grep pyspark
| | |-python,32602 -m pyspark.daemon
| | | |-python,32608 -m pyspark.daemon
| | | |-python,32609 -m pyspark.daemon
| | | |-python,32614 -m pyspark.daemon
| | | `-python,32616 -m pyspark.daemon
| | | |-python,32601 -m pyspark.daemon
| | | |-python,32607 -m pyspark.daemon
| | | |-python,32612 -m pyspark.daemon
| | | `-python,32615 -m pyspark.daemon
| |-grep,1487 --color=auto pyspark
PID 32602 -> The master python job which interacts with the spark executor to fetch data
Other PIDs are the spacy workers launched, default behavior is one worker per CPU core.
Suppose we ran the apache spark worker with 8 core, and allocated 1 CPU for each executor, then it will fork 8x8 = 64 python processes to do the task, in place of 8 processes. Which will degrade pipeline performance.
For Yarn
or Kubernetes
cluster manager this problem won’t happen as both
will restrict the system view to application restricted — similar to VMs; with the
help of Control Group (cgroup
) and namespace
features. So the spark executor
or the python worker won’t see the entire CPU / RAM for utilization, they get it by
the allocation specified based on the container spec on both Yarn and Kubernetes
environment.
Ensure you are writing spark pipeline with serializable objects, or do lazy evaluation.
Be careful when using external libraries like spacy, which may bring its own multiprocessing feature, which will result in overloading the system with spark executor configuration.
Use a different cluster manager other than a standalone one to get more control over-allocating resources to the executors.