Apache Spark cheat sheet for scala and pyspark
Tweetin data-science · Mon 15 April 2019
in data-science · Mon 15 April 2019
This page contains a bunch of spark pipeline transformation methods, which we can use for different problems. Use this as a quick cheat on how we can do particular operation on spark dataframe or pyspark.
Note
|
This code snippets are tested on spark-2.4.x version, mostly work on
spark-2.3.x also, but not sure about older versions.
|
val vocabDist = spark.read
.format("json")
.option("mergeSchema", "true")
.load("/mnt/all_models/run-26-nov-2018-clean-vocab-50k-4m/model/topic-description"
applicable to all types of files supported
Here we are merging all the partitions into one file and dumping it into the disk, this happens at the driver node, so be careful with sie of data set that you are dealing with. Otherwise, the driver node may go out of memory.
Use coalesce
method to adjust the partition size of RDD based on our needs.
spark.coaleace(1)
.write
.json("/mnt/test.json")
vocabDist
.filter($"topic" === 0)
.select("term")
.filter(x => x.toString.stripMargin.length == 3)
.count()
// Find minimal value of data frame.
vocabDist
.filter("topic == 0")
.select("term")
.map(x => x.toString.length)
.agg(min("value"))
.show()
Use case class if you want to map on multiple columns with a complex data structure.
case class Person(name: String, address: Array[String])
// Use the case-class to get the more controll on the types when
// Filtering the dataFrames.
val personDf
.select("name", "address")
.as[Person]
.map(p => (p.name, p.address[0]))
.toDF("name", "primary_address")
OR using Row
class.
import org.apache.spark.sql.Row
val newDf = personDf.map {
case Row(name: String, address: Array[String]) => (name, address[0])
}.toDF("name", "primary_address")
Provide easily access the nested data structures like json
and filter them
using any existing udfs, or use your udf to get more flexibility here.
// Select only those documents which has some features in it.
doc.selectExpr("doc_id", "size(features.values) as feature_size")
.where("feature_size > 0")
Using standard RDD
operation via pyspark API isn’t straight forward, to get that
we need to invoke the .rdd
to convert the DataFrame to support these features.
For example, here we are converting a sparse vector to dense and summing it in column-wise.
vocab_freq = countVector\
.select("features")\
.rdd\
.map(lambda x: SparseVector(x[0].size, x[0].indices, x[0].values).toArray())\
.reduce(lambda a, b: a + b)
Pyspark Map on multiple columns
# Find document length.
doc_lengths = countVector\
.selectExpr("doc_id", "features.indices as indicies", "features.values as feature_freq")\
.select("doc_id", "feature_freq")\
.rdd.map(lambda x: Row(doc_id=x[0], doc_length=sum(x[1])))\
.toDF()
val dataPqFiltered = dataPq
.selectExpr("length(abstract_html_strip) as doc_len", "*")
.filter("doc_len > 3")
val tokenFilter = udf((arr: Seq[String]) => arr.filter(_.matches("\\w+-\\w+|\\w+-|\\w+")))
val tokenCounter = udf((arr: Seq[String]) => arr.length))
val tokenSamples = dataPqCleaned
.withColumn("tokenFiltered",tokenCounter(tokenFilter(dataPqCleaned("abstract_tokenized"))))
DataFrame.select(sum($"tokenFiltered")).show()
Other function examples are "avg", "std" etc.. Refer org.apache.spark.sql.functions._
Sometimes we only need to work with the ascii text, so it’s better to clean out other chars.
val tokenFilterFlat = udf((arr: Seq[String]) => arr.flatMap(
"\\w+-\\w+|\\w+-|\\w+".r.findAllIn(_)).filter(_.length > 3))
val tokenFilter = udf((arr: Seq[String]) => arr.filter(_.matches("\\w+-\\w+|\\w+-|\\w+")))
val tokenCounter = udf((arr: Seq[String]) => arr.length)
val minLengthFilter = udf((arr: Seq[String]) => arr.filter(_.length > 3))
When using the spark to read data from the SQL database and then do the other pipeline processing on it, it’s recommended to partition the data according to the natural segments in the data, or at least on an integer column, so that spark can fire multiple sql queries to read data from SQL server and operate on it separately, the results are going to the spark partition.
Bellow commands are in pyspark, but the APIs are the same for the scala version also.
jdbc_url = "jdbc://..."
src_conn_prop = {
//
}
data_query = "(select * from reporting limit 100000)data"
report_ids = spark.read.jdbc(url = jdbc_url,
table = data_query,
lowerBound = 1,
column = "report_id",
upperBound = 603442,
numPartitions = 3,
properties = src_conn_prop)
This will be very helpful when working with pyspark
and want to pass very
nested json data between JVM and Python processes. Lately spark community relay on
apache arrow project to avoid multiple serialization/deserialization costs when
sending data from java memory to python memory or vice versa.
So to process the inner objects you can make use of this getItem
method
to filter out required parts of the object and pass it over to python memory via
arrow. In the future arrow might support arbitrarily nested data, but right now it won’t
support complex nested formats. The general recommended option is to go without nesting.
doc_features
.select($"features".getItem("values").alias("vocab_count"))
.select(size($"vocab_count").alias("unique_features"))
.groupBy("unique_features")
.count()
.show()
Type annotation .as[String]
avoid implicit conversion assumed.
df.select("column").as[String].map(x => Seq(x.toString))
This is a stream of operation on a column of type Array[String]
and collect
the tokens and count the n-gram distribution over all the tokens.
dataset_sample
.select("chunks").as[Array[String]]
.collect
.flatten
.distinct
.map(x => x.split(" ").length)
.zipWithIndex
.groupBy(_._1)
.map { case (k, v) => (k, v.size) }
.toArray
.sortBy(_._1)
Most of the time we might require a cloud storage provider like s3 / gs etc, to read and write the data for processing, very few keeps in-house hdfs to handle the data themself, but for majority, I think cloud storage easy to start with and don’t need to bother about the size limitations.
Here is the quick snippet to connect with s3.
// Export these two envs before running `spark-shell`.
export AWS_SECRET_KEY=
export AWS_ACCESS_KEY=
spark-shell --packages org.apache.hadoop:hadoop-aws:2.7.7 --master <master-url>
import com.amazonaws.auth._
val envReader = new EnvironmentVariableCredentialsProvider()
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", envReader.getCredentials().getAWSAccessKeyId)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", envReader.getCredentials().getAWSSecretKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
Recent versions of awscli
expect its configurations are kept under ~/.aws/credentials
file,
but old versions looks at ~/.aws/config
path, spark 2.4.x version now looks at the ~/.aws/config
location
since spark 2.4.x comes with default hadoop jars of version 2.7.x.
// Configure the spark to read from s3. Ensure the
// aws config file is set at ~/.aws/config path.
import com.amazonaws.auth.profile.ProfilesConfigFile
val profileReader = new ProfilesConfigFile().getCredentials("default")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", profileReader.getAWSAccessKeyId)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", profileReader.getAWSSecretKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
This might require when working with a huge dataset and your machine can’t hold them all in memory for given pipeline steps, those cases the data will be spilled over to disk, and saved in tmp directory.
Set bellow properties to ensure, you have enough space in tmp location.
#vim ./conf/spark-defaults.conf
...
spark.local.dir /mnt/spark-tmp
spark.executor.extraJavaOptions /mnt/spark-tmp
spark.driver.extraJavaOptions /mnt/spark-tmp
...
When using the arrow
to transport data between jvm to python memory, the arrow may throw
bellow error if the types aren’t compatible to existing converters. The fixes may become
in the future on the arrow’s project. I’m keeping this here to know that how the pyspark gets
data from jvm and what are those things can go wrong in that process.
Example 1:
arrs = [create_array(s, t) for s, t in series]
File "/home/ubuntu/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 251, in create_array
return pa.Array.from_pandas(s, mask=mask, type=t)
File "pyarrow/array.pxi", line 531, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 171, in pyarrow.lib.array
File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
File "pyarrow/error.pxi", line 89, in pyarrow.lib.check_status
pyarrow.lib.ArrowNotImplementedError: NumPyConverter doesn't implement <list<item: int32>> conversion.
Once you have downloaded the same version of the spark binary across the machines you can start the spark master and slave processes to form the standalone spark cluster. Or you could run both these services on the same machine also.
cd spark-<version>
# Start the spark master process, provide the master configurations via
# properties file or add it in default config file under the conf folder.
./sbin/start-master.sh [--properties-file <file>]
# Start slave services on each node where we want to run the slave and connect
# All the slaves to master to form the cluster.
#
./sbin/start-slave.sh -c 2 -m 16g spark://master-host:7077
Standalone mode,
Worker can have multiple executors.
Worker is like a node manager in yarn.
We can set worker max core and memory usage settings.
When defining the spark application via spark-shell or so, define the executor memory and cores.
eg; worker-1 has 10 core and 20gb memory
When submitting the job to get 10 executor with 1 cpu and 2gb ram each,
spark-submit --execture-cores 1 --executor-memory 2g --master <url>
Note
|
This page will be updated as and when I see some reusable snippet of code for spark operations |
Added spark standalone commands.