Welcome to new things

[Technical] [Electronic work] [Gadget] [Game] memo writing

Spark's frequently used code notes

Since Spark is a Python program, it can be written quite freely.

However, since I always have a general idea of what I need to do, and knowing various ways of writing Spark makes it harder to remember, I will summarize my personal frequently used Spark code in the form of one-purpose-one-code.

Basic Spark work

  1. Read data from storage or DB and create DataFrame
  2. DataFrame processing

    • Processing by query
    • Processing by function
  3. Export processed DataFrame to storage or DB

data loading

  • Create DataFrame using spark.read to read data from storage
  • File formats are mainly CSV and JSON

Basic

pass

  • Multiple paths can be passed with list
  • Wildcards can be used in blob format
  • Subdirectory wildcards can be used in blob format

parameter

df = spark.read.format("csv").\
    schema("col1 int, col2 string, col3 date").\
    option("timestampFormat", "yyyy/MM/dd HH:mm:ss").\
    option("header", true).\
    load("gs://xxxx/xxxx/*/*")

df = spark.read.format("json").\
    load(["gs://xxxx/xxxx", "gs://yyyy/yyy"])

AWS S3・Google Cloud Storage

  • S3 is "s3a://", not "s3://".
df = spark.read.format("csv").load("gs://xxxx/xxxx")
df = spark.read.format("json").load("s3a://xxxx/xxxx")

JDBC Driver

  • Can create DataFrame from RDB
  • There are two ways to read tables and query results
_options = {
    "url": "jdbc:mysql://<server_name>:3306/<db_name>",
    "driver": "com.mysql.jdbc.Driver",
    "user": "xxxx",
    "password": "xxxx"
}

# Read table
options = _options.copy()
options["dbtable"] = "<table_name>"
df = spark.read.format("jdbc")\
    .options(**options)
    .load()

# Read the query
options = _options.copy()
options["query"] = "SELECT xxxx FROM xxxx"
df = spark.read.format("jdbc")\
    .options(**options)
    .load()

Text

  • DataFrame can be created from a text file.
  • There are two methods: one method with one record per line and one method with one record per file.
# Read one row as one column (column name is "value")
df = spark.read.text("gs://xxxx")

# Read one file as one column (column name is "value")
df = spark.read.text("gs://xxxx", wholetext=True)

Manual DataFrame Composition

  • DataFrame can be created by writing data directly into the program.
  • Column name and type specification, i.e., schema instructions are required separately
json = sc.parallelize((
    {"id":123, "name":"abc"},
    {"id":123, "name":"abc"}
))

df_json = spark.createDataFrame(json, "id integer, name string")

csv = sc.parallelize([
    ("123", "abc"),
    ("123", "abc")
])

df_csv = spark.createDataFrame(csv, "id integer, name string")

entry

  • Output DataFrame to file or DB using DataFrame.write

file

  • The output path represents a folder
  • Cannot specify a file name and output to a single file

parameter

  • Parameters are set by option() and options()
  • path, format, and mode cannot be set in option() and options()
  • dateFormat: string specifying date output format
  • timestampFormat: Specify output format of timestamp by string
  • compression: "gzip". Compression: "gzip".
  • mode

    • append: append
    • overwrite: overwrite

      • If there are already directories and files in the directory, it will delete the files in the directory.
  • CSV

    • header: True/False, with or without header. Default False
df.write.format("json")\
    .mode("overwrite")\
    .save("gs://xxxx/xxxx")

partitioning

  • Can output separate directories by column value
  • Note that used columns are deleted from the data.
  • Note that "overwriting" with mode() will rewrite the base path and below (not just the terminal directory).
# Output by "gs://xxxxx/xxxx/col1=yyy/col2=zzz/*".
# With "overwrite", all files and folders in "gs://xxxxx/xxxxx/*" are deleted before output.
df.write.format("json")\
    .mode("overwrite")\
    .partitionBy("col1", "col2")\
    .save("gs://xxxx/xxxx")

JDBC Driver

  • Can output DataFrame to RDB
  • If the table does not exist, it will create it.
options = {
    "url":"jdbc:mysql://<server_name>:3306/<db_name>",
    "driver":"com.mysql.jdbc.Driver",
    "dbtable":"<table_name>",
    "user":"xxxx",
    "password":"xxxx"
}

df.write.format("jdbc").\
    mode("append").\
    options(**options).\
    save()

query

Ability to create DataFrames with SQL statements (quite useful)

First, create a view from a DataFrame.

You can then use that view in SQL statements.

df.createOrReplaceTempView("<view_name>")
df = spark.sql("""
    SELECT
        *
    FROM <view_name>
    LIMIT 10
    """
  • Most processing is completed by a query.

    • Complex processes are easier to write in queries.
  • View is a copy, not a reference, of the original DataFrame
  • View name and DataFrame name can be the same (different)
  • List of functions that can be used in a query

How to write without queries

column addition

df.withCloumn("new_col", df["col1"])
df.withCloumn("new_col", df["col2"]*2)

# Constants cannot be set as is.
# Set constants via lit.
from pypark.sql.functions import lit
df.withColumn("new_col", lit("TEST"))

filter (esp. camera)

df.filter("col1=123")
df.filter("col1=123 AND col2=111 ...")

delete a column

# col1 and col2 only.
df.select("col1", "col2")
df.select(df["col1"], df["col2"])

# Delete col1 and col2
df.drop("col1", "col2")
df.drop(df["col1"], df["col2"])

type conversion

df.select(df["id"].cast("string"))

data acquisition

How to use DataFrame data in a program

  • df.collect()

    • Finalize and aggregate all results and return them in an array of type Row
  • The element Row["<col_name>"] is used to extract elements from Row.
  • Conversion from Row to map is Row.asDict()
  • df.toPandas()

    • Finalize and tabulate all results and return a Pandas DataFrame
colArray = map(lambda x: [x["col1"], x["col2"]], df.collect())
dictArray = map(lambda x: x.asDict(), df.collect())
pd = df.toPandas()

For debugging

# Mid-body representation
df.show(<個数>)

# Schema Display
df.printSchema()

# Number of acquisitions
df.count()

Other

application model

How to turn a program created with Jupyter and Zeppelin into an application that can be executed by Job

In Jupyter Zeppelin, "spark" and "sc" are created in advance, so add code to create "spark" and "sc".

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('app_name').getOrCreate()
sc = spark.sparkContext

Fast check if the number of pieces is 0

  • df.count() takes time because it collects all data. df.head(1) is faster if it checks if there is an element or not
if len(df.head(1)):
    print("df has a recode.")

What is RDD?

  • The reality of RDD is an array of single row records
  • A single row of records is an array of values for each cell. The type is all strings
  • RDDs do not have header information. It is made up of data only.
  • RDDs are immutable (cannot be changed). Conversion is done by creating a new RDD

Expand JSON data stored in a column into a table (DataFrame)

Square Needle

  • Create DataFrame with JSON in columns

procedure

  • RDD (Row array) is returned by df.rdd
  • Row[\] from Row to Row[\] to extract column data
  • Since the extracted data is JSON, read the JSON in spark.read.json() and create a DataFrame.

    • In this case, the JSON read parameters of sparl.read, such as schema, can be used in the same way.
df_to = spark.read.json(
    df_from.rdd.map(lambda x: x[<json_column>])
)

reference

Reference Articles

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com

www.ekwbtblog.com