Simple Data Pipeline ETL with PySpark and AWS

sateesh.py
3 min readJan 8, 2024

--

https://github.com/SateehTeppala/Data-pipeline-ETL-with-PySpark-and-AWS

Objective:

Build an end-to-end data pipeline that extracts data from different sources, transforms it using PySpark, and loads it into an Amazon S3 bucket.

Steps:

  1. Collect data from various sources I am gathering data from diverse sources, such as CSV files, JSON documents, APIs, and PostgreSQL databases.
# reading data from JSON/CSV file
def read_data_from_json(spark: SparkSession, path):
df = spark.read.json(path)
log.info("reading data from JSON ==> ")
return df
# reading data from postgres
def _get_data_jdbc(spark: SparkSession):
log.info("Getting data from Postgres( JDBC ) ")
url = "jdbc:postgresql://{HOST_NAME}:{PORT}/{DB_NAME}"
properties = {
"user": "{USER_NAME}",
"password": "{PASSWORD}",
"driver": "org.postgresql.Driver"
}
table_name = "femp"
df = spark.read.jdbc(url, table_name, properties=properties)
log.info(f"printing schema of dataframe === >\n{df.dtypes}")
log.info(f"count of df ==> {df.count()}")
log.info(f"showing first 5 records === >\n{df.limit(5).toPandas().head().to_string(index=False)}")
return df
#reading data from API
def _get_api_data(spark: SparkSession, api_url):
log.info("Getting data from RestAPI ")
# Fetch data from the API using requests
response = requests.get(api_url)
api_data = response.json()
schema = StructType([
StructField("First Name", StringType(), True),
StructField("Last Name", StringType(), True),
StructField("Email", StringType(), True),
StructField("Gender", StringType(), True),
StructField("Salary", IntegerType(), True),
])
api_rows = [pyspark.sql.Row(**row) for row in api_data]
log.info(f"showing first row from api response ==> \n {api_rows[0]}")
# Create a PySpark DataFrame from the API data
df = spark.createDataFrame(api_rows)
# Show the DataFrame

# Log the schema of the DataFrame
log.info(f"printing schema of dataframe === >\n{df.dtypes}")
log.info(f"count of df ==> {df.count()}")
log.info(f"showing first 5 records === >\n{df.limit(5).toPandas().head().to_string(index=False)}")
return df

After collecting data from all the sources, i am converting all the data into single dataframe and performing transformations.

#appending list of dataframes
def append_dataframes(dataframes):
schema = dataframes[0].schema
if not all(df.schema == schema for df in a):
raise ValueError("All DataFrames must have the same schema as the first DataFrame.")
result_df = functools.reduce(DataFrame.union, dataframes)
log.info(f"Count of all appended dataframes ===>> {result_df.count()}")
return result_df

2. Transformations with PySpark

these are the transformations i’m performing

  • Changing the datatype of columns
  • Filling NULL with mean/avg values
  • Filtering the columns
# Transformations
def do_transformations(df:DataFrame):
log.info(":::::: Transformations started ::::::")
# removing $ symbol from salary column
df = df.withColumn('salary', translate('salary', '$', ''))
# changeing the data type of salary
df = df.withColumn("salary", df["salary"].cast(IntegerType()))
# replacing null values with mean or average in salary column
mean_val = df.select(mean(df.salary)).collect()
log.info(f'mean value of salary ==> {mean_val[0][0]}')
mean_salary = mean_val[0][0]
# now using men_salary value to fill the nulls in salary column
df = df.na.fill(mean_salary, subset=['salary'])
# considering only Male and Female in the gender column
df = df.filter(df["gender"].isin(["Male", "Female"]))
log.info(":::::: Transformations finished ::::::")
return df

3. Storing it in an AWS S3 bucket

loading the finished and processed data into the S3 bucket in a CSV file.

def upload_to_s3(csv_string):
log.info(":::::: uploading to S3 started ::::::")
aws_access_key_id = 'ooooooo'
aws_secret_access_key = 'fukfukfuk'
bucket_name = '{bucket_name}'
# Initialize S3 client
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
s3.put_object(Body=csv_string, Bucket=bucket_name, Key="csv/a1.csv")
log.info(":::::: uploading to S3 finished ::::::")

4. Orchestration using the Apache Airflow

So, running code at a particular time is a hectic and manual effort. to eliminate this problem, I am using Airflow.

DAG

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'spark_submit_dag',
default_args=default_args,
description='DAG to run spark-submit every 5 minutes',
schedule_interval=timedelta(minutes=5),
)
# Replace the path with the actual path to your spark-submit script (s1.py)
spark_submit_command = "/usr/local/spark/bin/spark-submit --conf spark.executorEnv.PYTHONPATH=/Users/user_name/Documents/GitHub/code_base/venv/lib/python3.9/site-packages:./venv \
--conf spark.yarn.appMasterEnv.PYTHONPATH=/Users/user_name/Documents/GitHub/code_base/venv/lib/python3.9/site-packages:./venv \
--conf spark.pyspark.driver.python=/Users/user/Documents/GitHub/code_base/venv/bin/python3 \
--conf spark.pyspark.python=/Users/user/Documents/GitHub/code_base/venv/bin/python3 \
--jars /Users/user/Documents/GitHub/code_base/postgresql-42.7.1.jar \
/Users/user/Documents/GitHub/code_base/s1.py"
run_spark_submit = BashOperator(
task_id='run_spark_submit',
bash_command=spark_submit_command,
dag=dag,
)
if __name__ == "__main__":
dag.cli()

Thank you

--

--

sateesh.py
sateesh.py

Written by sateesh.py

Python & data enthusiast | Helping businesses extract insights from their data | Experienced in Python, SQL, and data visualization

No responses yet