Spark

1. Introduction

Spark is based on the Hadoop ecosystem, using Hadoops scalable, flexible, fault-tolerant and cost effective storage and processing implementations while having its own cluster management solution. The intermediate results are also not stored on disk, but rather in distributed memory, making Spark computations a lot faster.

2. Installation

java -version

# If java 8 is not installed
sudo apt install --no-install-recommends openjdk-8-jre-headless -y

# Download spark from https://spark.apache.org/downloads.html
wget http://apache.40b.nl/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
gunzip -c spark-2.4.0-bin-hadoop2.7.tgz | tar xvf -
rm spark-2.4.0-bin-hadoop2.7.tgz

# Install spark
sudo mv spark-2.4.0-bin-hadoop2.7/ /usr/local/spark

Add to PATH:

echo "# For spark!" >> ~/.bashrc
echo "JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64" >> ~/.bashrc
echo "export PATH=$PATH:/usr/local/spark/bin" >> ~/.bashrc
source ~/.bashrc

Verify the installation:

pyspark

3. Testing

https://github.com/holdenk/spark-testing-base

4. Pyspark

4.1 Setup

Import types and functions:

from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
import pyspark.sql.types as st

Start spark session:

spark = SparkSession \
    .builder \
    .appName("Spark Aplication") \
    .getOrCreate()

Read data:

# 'csv', 'jdbc', 'json', 'orc', 'parquet', 'text'
df = spark.read \
    .format("csv") \             # How the data is stored
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("nanValue", "NA") \
    .csv("data/heroes.csv")      # How the data should be stored

Output:

df.show(10, truncate=False)
df.collect()
df.head(10)
df.toPandas()
df.printSchema()

4.2 Create DataFrame

Specify data and schema:

data = [
    ("2015-05-14 03:53:00", "WARRANT ARREST"),
    ("2015-05-14 03:53:00", "TRAFFIC VIOLATION"),
    ("2015-05-14 03:33:00", "TRAFFIC VIOLATION")
]

test = spark.createDataFrame(data, ["Dates", "Description"])

Add column:

test.withColumn('added_col', 
    sf.when(sf.col('Dates') > "2015-05-14 03:33:00")
)

5. Docker Image

Pipfile
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
pyspark = "*"

[dev-packages]
pyspark = "*"

[requires]
python_version = "3.6"
Dockerfile
FROM python:3.6.7-slim-stretch

WORKDIR /app

COPY Pipfile /app
COPY Pipfile.lock /app
RUN pip install pipenv

RUN pipenv install --system

RUN apt update && \
  mkdir -p /usr/share/man/man1 && \
  apt install -y openjdk-8-jre-headless

ENTRYPOINT ["pyspark"]

Last updated