How did I create a data pipeline using Kafka and Spark?

sateesh.py
3 min readJan 8, 2024

--

Extract process:

To create a data pipeline, all I need is the source of the data. For the data that is feeding into Kafka, I used mock dummy data using this: https://radata.vercel.app/random/data

curl https://radata.vercel.app/random/data

This API gives me random-person data. After that, I created a DAG for Apache Airflow that will run every 5 minutes and feed the data into Kafka messages on a specific topic.

DAG example:

This API gives me random-person data. After that, I created a DAG for Apache Airflow that will run every 5 minutes and feed the data into Kafka messages on a specific topic.

DAG example:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import requests
import json
from kafka import KafkaProducer

def stream():
a = requests.get('https://radata.vercel.app/random/data')
bootstrap_servers = 'localhost:9092'
topic = 'user_created' # Adjust the topic name as needed
# Create Kafka producer configuration
producer_config = {
'bootstrap_servers': bootstrap_servers,
'max_block_ms': 5000
}
# Create Kafka producer instance
producer = KafkaProducer(**producer_config)
print(f"Number of records: {len(a.json())}")
for data in a.json():
producer.send(topic, json.dumps(data).encode('utf-8'))

default_args = {
'owner': 'Satya',
'depends_on_past': False,
'start_date': datetime(2024, 1, 6),
'retries': 1,
'retry_delay': timedelta(minutes=30),
}

dag = DAG(
'produce_api_data_kafka',
default_args=default_args,
description='A DAG to fetch and send data to Kafka every 5 minutes',
schedule_interval=timedelta(minutes=5),
)

fetch_and_print_task = PythonOperator(
task_id='fetch_and_send_to_kafka',
python_callable=stream,
dag=dag,
)

fetch_and_print_task

Transform and load:

I used Spark (pySpark) for transformation. I converted the streaming as per my requirements, then loaded it into the PostgreSQL table.

from pyspark.sql import SparkSession
import logging
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
import os

logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)

log = logging.getLogger("FAWS")# Setting the threshold of logger to DEBUG
log.setLevel(logging.DEBUG)

spark = SparkSession.builder.master("local[1]") \
.appName("AWS data-engineering") \
.config("spark.jars", "spark-sql-kafka-0-10_2.12-3.0.1.jar") \
.config("spark.driver.extraClassPath", "spark-sql-kafka-0-10_2.12-3.0.1.jar") \
.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
log.info(f"Spark-version ==> {spark.version}")
json_schema = StructType([
StructField("First Name", StringType(), True),
StructField("Last Name", StringType(), True),
StructField("Salary", IntegerType(), True),
StructField("Currency", StringType(), True),
StructField("Email", StringType(), True),
StructField("Gender", StringType(), True),
StructField("IP Address", StringType(), True),
StructField("Address", StringType(), True),
StructField("Phone Number", StringType(), True),
])
# Read from Kafka using Spark Structured Streaming
spark_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'user_created') \
.option('startingOffsets', 'earliest') \
.load()
# Convert the 'value' column from Kafka to a string and then parse the JSON
parsed_df = spark_df \
.selectExpr("CAST(value AS STRING) as json_str") \
.select(from_json("json_str", json_schema).alias("data")) \
.select("data.*")
postgres_properties = {
"user": "{user_id}",
"password": "{pwd}",
"driver": "org.postgresql.Driver",
"url": "{host_url/jdbc url}"
}
query = parsed_df.writeStream \
.outputMode("append") \
.foreachBatch(
lambda batch_df, batch_id: (
log.info(f"Appending batch {batch_id} to PostgreSQL"),
log.info(f"batch size is >>> {batch_df.count()}"),
batch_df.write.jdbc(postgres_properties["url"], 'your_table_name', mode='overwrite',
properties=postgres_properties),
# Display the parsed DataFrame in the console
log.info(f"Displaying batch {batch_id} in the console:"),
batch_df.show(5, truncate=False)
)
) \
.start()
query.awaitTermination()

Thanks

--

--

sateesh.py
sateesh.py

Written by sateesh.py

Python & data enthusiast | Helping businesses extract insights from their data | Experienced in Python, SQL, and data visualization

No responses yet