Since Spark is a Python program, it can be written quite freely.
However, since I always have a general idea of what I need to do, and knowing various ways of writing Spark makes it harder to remember, I will summarize my personal frequently used Spark code in the form of one-purpose-one-code.
Basic Spark work
- Read data from storage or DB and create DataFrame
DataFrame processing
- Processing by query
- Processing by function
- Export processed DataFrame to storage or DB
data loading
- Create DataFrame using
spark.read
to read data from storage - File formats are mainly CSV and JSON
Basic
pass
- Multiple paths can be passed with list
- Wildcards can be used in blob format
- Subdirectory wildcards can be used in blob format
parameter
- Parameters are set by
option()
andoptions()
- path, format, and schema cannot be set for
option()
andoptions()
- Format is specified by format
CSV/JSON
schema:string to specify data type
- Type name is "class pyspark.sql.types.xxxxTypes" with "xxxx" in lower case
- https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.types.DataType
CSV
- header: True/False, with or without header. Default False
dateFormat: string can be used to specify the format for reading dates
- Be able to read formats such as "YYYYY/mm/dd".
- https://docs.oracle.com/javase/jp/6/api/java/text/SimpleDateFormat.html
- timestampFormat: A string can be used to specify the format for reading timestamps.
df = spark.read.format("csv").\ schema("col1 int, col2 string, col3 date").\ option("timestampFormat", "yyyy/MM/dd HH:mm:ss").\ option("header", true).\ load("gs://xxxx/xxxx/*/*") df = spark.read.format("json").\ load(["gs://xxxx/xxxx", "gs://yyyy/yyy"])
AWS S3・Google Cloud Storage
- S3 is "s3a://", not "s3://".
df = spark.read.format("csv").load("gs://xxxx/xxxx") df = spark.read.format("json").load("s3a://xxxx/xxxx")
JDBC Driver
- Can create DataFrame from RDB
- There are two ways to read tables and query results
_options = { "url": "jdbc:mysql://<server_name>:3306/<db_name>", "driver": "com.mysql.jdbc.Driver", "user": "xxxx", "password": "xxxx" } # Read table options = _options.copy() options["dbtable"] = "<table_name>" df = spark.read.format("jdbc")\ .options(**options) .load() # Read the query options = _options.copy() options["query"] = "SELECT xxxx FROM xxxx" df = spark.read.format("jdbc")\ .options(**options) .load()
Text
- DataFrame can be created from a text file.
- There are two methods: one method with one record per line and one method with one record per file.
# Read one row as one column (column name is "value") df = spark.read.text("gs://xxxx") # Read one file as one column (column name is "value") df = spark.read.text("gs://xxxx", wholetext=True)
Manual DataFrame Composition
- DataFrame can be created by writing data directly into the program.
- Column name and type specification, i.e., schema instructions are required separately
json = sc.parallelize(( {"id":123, "name":"abc"}, {"id":123, "name":"abc"} )) df_json = spark.createDataFrame(json, "id integer, name string") csv = sc.parallelize([ ("123", "abc"), ("123", "abc") ]) df_csv = spark.createDataFrame(csv, "id integer, name string")
entry
- Output DataFrame to file or DB using
DataFrame.write
file
- The output path represents a folder
- Cannot specify a file name and output to a single file
parameter
- Parameters are set by
option()
andoptions()
- path, format, and mode cannot be set in
option()
andoptions()
- dateFormat: string specifying date output format
- timestampFormat: Specify output format of timestamp by string
- compression: "gzip". Compression: "gzip".
mode
- append: append
overwrite: overwrite
- If there are already directories and files in the directory, it will delete the files in the directory.
CSV
- header: True/False, with or without header. Default False
df.write.format("json")\ .mode("overwrite")\ .save("gs://xxxx/xxxx")
partitioning
- Can output separate directories by column value
- Note that used columns are deleted from the data.
- Note that "overwriting" with
mode()
will rewrite the base path and below (not just the terminal directory).
# Output by "gs://xxxxx/xxxx/col1=yyy/col2=zzz/*". # With "overwrite", all files and folders in "gs://xxxxx/xxxxx/*" are deleted before output. df.write.format("json")\ .mode("overwrite")\ .partitionBy("col1", "col2")\ .save("gs://xxxx/xxxx")
JDBC Driver
- Can output DataFrame to RDB
- If the table does not exist, it will create it.
options = { "url":"jdbc:mysql://<server_name>:3306/<db_name>", "driver":"com.mysql.jdbc.Driver", "dbtable":"<table_name>", "user":"xxxx", "password":"xxxx" } df.write.format("jdbc").\ mode("append").\ options(**options).\ save()
query
Ability to create DataFrames with SQL statements (quite useful)
First, create a view from a DataFrame.
You can then use that view in SQL statements.
df.createOrReplaceTempView("<view_name>") df = spark.sql(""" SELECT * FROM <view_name> LIMIT 10 """
Most processing is completed by a query.
- Complex processes are easier to write in queries.
- View is a copy, not a reference, of the original DataFrame
- View name and DataFrame name can be the same (different)
List of functions that can be used in a query
How to write without queries
column addition
df.withCloumn("new_col", df["col1"]) df.withCloumn("new_col", df["col2"]*2) # Constants cannot be set as is. # Set constants via lit. from pypark.sql.functions import lit df.withColumn("new_col", lit("TEST"))
filter (esp. camera)
df.filter("col1=123") df.filter("col1=123 AND col2=111 ...")
delete a column
# col1 and col2 only. df.select("col1", "col2") df.select(df["col1"], df["col2"]) # Delete col1 and col2 df.drop("col1", "col2") df.drop(df["col1"], df["col2"])
type conversion
df.select(df["id"].cast("string"))
data acquisition
How to use DataFrame data in a program
df.collect()
- Finalize and aggregate all results and return them in an array of type Row
- The element
Row["<col_name>"]
is used to extract elements fromRow
. - Conversion from
Row
to map isRow.asDict()
df.toPandas()
- Finalize and tabulate all results and return a Pandas DataFrame
colArray = map(lambda x: [x["col1"], x["col2"]], df.collect()) dictArray = map(lambda x: x.asDict(), df.collect()) pd = df.toPandas()
For debugging
# Mid-body representation df.show(<個数>) # Schema Display df.printSchema() # Number of acquisitions df.count()
Other
application model
How to turn a program created with Jupyter and Zeppelin into an application that can be executed by Job
In Jupyter Zeppelin, "spark" and "sc" are created in advance, so add code to create "spark" and "sc".
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('app_name').getOrCreate() sc = spark.sparkContext
Fast check if the number of pieces is 0
- df.count() takes time because it collects all data. df.head(1) is faster if it checks if there is an element or not
if len(df.head(1)): print("df has a recode.")
What is RDD?
- The reality of RDD is an array of single row records
- A single row of records is an array of values for each cell. The type is all strings
- RDDs do not have header information. It is made up of data only.
- RDDs are immutable (cannot be changed). Conversion is done by creating a new RDD
Expand JSON data stored in a column into a table (DataFrame)
Square Needle
- Create DataFrame with JSON in columns
procedure
- RDD (Row array) is returned by df.rdd
- Row[\
] from Row to Row[\ ] to extract column data Since the extracted data is JSON, read the JSON in
spark.read.json()
and create a DataFrame.- In this case, the JSON read parameters of
sparl.read
, such as schema, can be used in the same way.
- In this case, the JSON read parameters of
df_to = spark.read.json(
df_from.rdd.map(lambda x: x[<json_column>])
)
reference
DataFrame
DataFrame.read
DataFrame.write
DataType
SQL Functions
JDBC Driver・classPath
Reference Articles
- https://stackoverflow.com/questions/32707620/how-to-check-if-spark-dataframe-is-empty
- https://stackoverflow.com/questions/32788322/how-to-add-a-constant-column-in-a-spark-dataframe