Apache Spark cheat sheet for scala and pyspark

This page contains a bunch of spark pipeline transformation methods, which we can use for different problems. Use this as a quick cheat on how we can do particular operation on spark dataframe or pyspark.

Note
This code snippets are tested on spark-2.4.x version, mostly work on spark-2.3.x also, but not sure about older versions.

Read the partitioned json files from disk

val vocabDist = spark.read
    .format("json")
    .option("mergeSchema", "true")
    .load("/mnt/all_models/run-26-nov-2018-clean-vocab-50k-4m/model/topic-description"

applicable to all types of files supported

Save partitioned files into a single file.

Here we are merging all the partitions into one file and dumping it into the disk, this happens at the driver node, so be careful with sie of data set that you are dealing with. Otherwise the driver node may go out of memory.

Use coaleace method to adjust the partition size of RDD based on our needs.

spark.coaleace(1)
    .write
    .json("/mnt/test.json")

Filter rows which meets particular criteria

vocabDist
    .filter($"topic" === 0)
    .select("term")
    .filter(x => x.toString.stripMargin.length == 3)
    .count()

// Find minimal value of data frame.
vocabDist
    .filter("topic == 0")
    .select("term")
    .map(x => x.toString.length)
    .agg(min("value"))
    .show()

Map with case class

Use case class if you want to map on multiple column with complex data structure.

case class Person(name: String, address: Array[String])

// Use the case-class to get the more controll on the types when
// Filtering the dataFrames.
val personDf
	.select("name", "address")
	.as[Person]
	.map(p => (p.name, p.address[0]))
	.toDF("name", "primary_address")

Use selectExpr to access inner attributes

Provide easily access the nested datastructures like json and filter them using the any existing udfs, or use your udf to get more flexibility here.

// Select only those documents which has some features in it.
doc.selectExpr("doc_id", "size(features.values) as feature_size")
 .where("feature_size > 0")

Filtering a DataFrame column of type Seq[String]

val dataPqFiltered = dataPq
    .selectExpr("length(abstract_html_strip) as doc_len", "*")
    .filter("doc_len > 3")

Filter a column with custom regex and udf

val tokenFilter = udf((arr: Seq[String]) => arr.filter(_.matches("\\w+-\\w+|\\w+-|\\w+")))
val tokenCounter = udf((arr: Seq[String]) => arr.length))


val tokenSamples = dataPqCleaned
    .withColumn("tokenFiltered",tokenCounter(tokenFilter(dataPqCleaned("abstract_tokenized"))))

Sum a column elements

DataFrame.select(sum($"tokenFiltered")).show()
Other function examples are "avg", "std" etc.. Refer org.apache.spark.sql.functions._

Remove unicode characters from tokens

Some time we only need to work with the ascii text, so it’s better to clean out other chars.

val tokenFilterFlat = udf((arr: Seq[String]) => arr.flatMap(
    "\\w+-\\w+|\\w+-|\\w+".r.findAllIn(_)).filter(_.length > 3))

val tokenFilter = udf((arr: Seq[String]) => arr.filter(_.matches("\\w+-\\w+|\\w+-|\\w+")))
val tokenCounter = udf((arr: Seq[String]) => arr.length)
val minLengthFilter = udf((arr: Seq[String]) => arr.filter(_.length > 3))

Connecting to jdbc with partition by integer column

When using the spark to read data from the SQL database and then do the other pipeline processing on it, it’s recommended to partition the data according to the natural segments in the data, or at least on a integer column, so that spark can fire multiple sql quries to read data from SQL server and operate on it separately, the results are going to the spark partition.

Bellow commands are in pyspark, but the APIs are same for scala version also.

jdbc_url = "jdbc://..."
src_conn_prop =  {
    //
}

data_query = "(select * from reporting limit 100000)data"
report_ids = spark.read.jdbc(url = jdbc_url,
                        table = data_query,
                        lowerBound = 1,
                        column = "report_id",
                        upperBound = 603442,
                        numPartitions = 3,
                        properties = src_conn_prop)

Parse nested json data

This will be very helpful when working with pyspark and want to pass very nested json data between JVM and Python processes. Lately spark community relay on apache arrow project to avoid multiple serialization / deserialization costs when sending data from java memory to python memory or vice versa.

So to process the inner objects you can make use of this getItem method to filter out required parts of the object and pass it over to python memory via arrow. In future arrow might support arbitrary nested data, but right now it won’t support complex nested formats. General recommended option is go without nesting.

doc_features
   .select($"features".getItem("values").alias("vocab_count"))
   .select(size($"vocab_count").alias("unique_features"))
   .groupBy("unique_features")
   .count()
   .show()

"string ⇒ array<string>" conversion

Type annotation .as[String] avoid implicite conversion assumed.

    df.select("column").as[String].map(x => Seq(x.toString))

A crazy string collection and groupby

This is a stream of operation on a column of type Array[String] and collect the tokens and count the ngram distribution over all the tokens.

dataset_sample
    .select("chunks").as[Array[String]]
    .collect
    .flatten
    .distinct
    .map(x => x.split(" ").length)
    .zipWithIndex
    .groupBy(_._1)
    .map { case (k, v) => (k, v.size) }
    .toArray
    .sortBy(_._1)

How to access AWS s3 on spark-shell or pyspark

Most of the time we might required a cloud storage provider like s3 / gs etc, to read and write the data for processing, very few keeps in-house hdfs to handle the data themself, but for majority I think cloud storage easy to start with and don’t need to bother about the size limitations.

Here is the quick snippet to connect with s3.

Supply the aws credentials via environment variable

// Export these two envs before running `spark-shell`.
export AWS_SECRET_KEY=
export AWS_ACCESS_KEY=

spark-shell --packages org.apache.hadoop:hadoop-aws:2.7.7 --master <master-url>

import com.amazonaws.auth._
val envReader = new EnvironmentVariableCredentialsProvider()
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", envReader.getCredentials().getAWSAccessKeyId)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", envReader.getCredentials().getAWSSecretKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

Supply the credentials via default aws ~/.aws/config file

Recent versions of awscli expect its configurations are kept under ~/.aws/credentials file, but old versions looks at ~/.aws/config path, spark 2.4.x version now looks at the ~/.aws/config location since spark 2.4.x comes with default hadoop jars of version 2.7.x.

// Configure the spark to read from s3. Ensure the
// aws config file is set at ~/.aws/config path.
import com.amazonaws.auth.profile.ProfilesConfigFile

val profileReader = new ProfilesConfigFile().getCredentials("default")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", profileReader.getAWSAccessKeyId)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", profileReader.getAWSSecretKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

Set spark scratch space or tmp directory correctly

This might require when working with huge dataset and your machine can’t hold them all in memory for a given pipeline steps, those cases the data will be spilled over to disk, and saved in tmp directory.

Set bellow properties to ensure, you have enough space in tmp location.

#vim ./conf/spark-defaults.conf

...
spark.local.dir   /mnt/spark-tmp
spark.executor.extraJavaOptions /mnt/spark-tmp
spark.driver.extraJavaOptions /mnt/spark-tmp

...

Pyspark doesn’t support all the data types.

When using the arrow to transport data between jvm to python memory, arrow may throw bellow error if the types aren’t compatible to existing converters. The fixes may be come in future on the arrow’s project. I’m keeping this here to know that how the pyspark gets data from jvm and what are those things can go wrong on that process.

Example 1:

    arrs = [create_array(s, t) for s, t in series]
  File "/home/ubuntu/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 251, in create_array
    return pa.Array.from_pandas(s, mask=mask, type=t)
  File "pyarrow/array.pxi", line 531, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 171, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: NumPyConverter doesn't implement <list<item: int32>> conversion.

Work with spark standalone cluster manager

Standalone mode,

  1. Worker can have multiple executor.

  2. Worker is like node manager in yarn.

  3. We can set worker max core and memory usage setting.

  4. When defining the spark application via spark-shell or so, define the executor memory and cores.

    eg; worker-1 has 10 core and 20gb memory
    When submitting the job to get 10 executor with 1 cpu and 2gb ram each,
spark-submit --execture-cores 1 --executor-memory 2g --master <url>
Note
This page will be updaed as and when I see some reusable snippet of code for spark operations

Updates

  1. Initial posting.

Go Top