1.1
Create
a table to compare OLTP vs Data Warehouse in SQL (using comments/notes).
Step
1: Import libraries and create engine
import
pandas as pd
from
sqlalchemy import create_engine, text
#
Create in-memory SQLite database
engine
= create_engine('sqlite://', echo=False)
Step
2: Create the table
create_table_query
= """
CREATE
TABLE OLTP_vs_DW (
Feature TEXT,
OLTP_Description TEXT,
DW_Description TEXT
);
"""
#
Use a connection to execute
with
engine.begin() as conn:
conn.execute(text(create_table_query))
Step
3: Insert data
insert_data_query
= """
INSERT
INTO OLTP_vs_DW (Feature, OLTP_Description, DW_Description)
VALUES
('Purpose',
'Handles day-to-day transactions', 'Supports analytics and decision-making'),
('Data
Orientation', 'Transaction-oriented', 'Subject-oriented / Analysis-oriented'),
('Data
Structure', 'Normalized tables to reduce redundancy', 'Denormalized tables to
support fast queries'),
('Data
Volume', 'High volume of detailed records', 'Aggregated / summarized data,
smaller volume'),
('Update
Frequency', 'Real-time / frequent updates', 'Periodic updates (daily, weekly,
monthly)'),
('Usage',
'Daily operations (insert, update, delete)', 'Reporting, trend analysis,
business intelligence'),
('Examples',
'Banking transactions, Retail POS', 'Sales summaries, Customer behavior
reports');
"""
with
engine.begin() as conn:
conn.execute(text(insert_data_query))
Step
4: Read and display the table
df_compare
= pd.read_sql('SELECT * FROM OLTP_vs_DW', engine)
df_compare
Step
5 (Optional): Save to CSV
df_compare.to_csv('OLTP_vs_DW_table.csv', index=False)
Step
1: Open Jupyter Notebook
Launch
Anaconda Navigator → Click Launch under Jupyter Notebook.
Or
open a terminal and type:
jupyter
notebook
A
browser window will open with the Jupyter Dashboard.
Click
New → Python 3 to create a new notebook.
Step
2: Import the required library
We
need pandas to create and manipulate tables.
In the first cell, type:
import
pandas as pd
Run
the cell with Shift + Enter.
Step
3: Create the OLTP vs DW table
We
use a pandas DataFrame to store the comparison.
#
Create a dictionary with table data
data
= {
"Feature": [
"Purpose", "Data
Orientation", "Data Structure",
"Data Volume", "Update
Frequency", "Usage", "Examples"
],
"OLTP_Description": [
"Handles day-to-day
transactions",
"Transaction-oriented",
"Normalized tables to reduce
redundancy",
"High volume of detailed
records",
"Real-time / frequent
updates",
"Daily operations (insert, update,
delete)",
"Banking transactions, Retail
POS"
],
"DW_Description": [
"Supports analytics and
decision-making",
"Subject-oriented /
Analysis-oriented",
"Denormalized tables to support
fast queries",
"Aggregated / summarized data,
smaller volume",
"Periodic updates (daily, weekly,
monthly)",
"Reporting, trend analysis,
business intelligence",
"Sales summaries, Customer
behavior reports"
]
}
#
Create DataFrame
df_compare
= pd.DataFrame(data)
#
Display the table
df_compare
Run
the cell (Shift + Enter).
You
will see a table with three columns: Feature, OLTP_Description, and
DW_Description.
Step
4: Save the table as CSV
df_compare.to_csv('OLTP_vs_DW_table.csv',
index=False)
This
saves the table as a CSV file named OLTP_vs_DW_table.csv in your notebook
working directory.
You
can download it from Jupyter Dashboard for submission.
Step
5: Optional MySQL load
If
you want to put this table into MySQL:
from
sqlalchemy import create_engine
#
Change username/password and database as per your MySQL setup
engine_mysql
=
create_engine('mysql+mysqlconnector://root:password@localhost/datawarehouse_lab')
df_compare.to_sql('OLTP_vs_DW',
engine_mysql, if_exists='replace', index=False)
Make
sure MySQL server is running and database datawarehouse_lab exists.
This
will create a table named OLTP_vs_DW in your MySQL database.
Step
6: Take screenshots
Jupyter
Notebook showing the table output.
CSV
file saved in working directory.
Optional:
MySQL Workbench showing the table.
1.2
Write
SQL queries to insert transactional data (OLTP) vs aggregated queries for Data
Warehouse.
Step
1: Open Jupyter Notebook
Launch
Anaconda Navigator → Jupyter Notebook → New → Python 3.
Create
a new notebook for this lab.
Step
2: Import Libraries
We
need pandas and SQLAlchemy for SQL operations:
import
pandas as pd
from
sqlalchemy import create_engine
Run
this cell with Shift + Enter.
Step
3: Create Sample OLTP Data in Python
We
will create a transactional table for students’ marks:
import
numpy as np
import
random
from
datetime import datetime, timedelta
#
Generate OLTP dataset
num_entries
= 500
student_ids
= np.random.randint(1, 51, size=num_entries)
subjects
= np.random.choice(['Math','Science','English','History'], size=num_entries)
marks
= np.random.randint(35, 101, size=num_entries)
start_date
= datetime(2024, 1, 1)
exam_dates
= [start_date + timedelta(days=random.randint(0, 365)) for _ in
range(num_entries)]
teacher_ids
= np.random.randint(1, 11, size=num_entries)
oltp_data
= pd.DataFrame({
"StudentID": student_ids,
"Subject": subjects,
"Marks": marks,
"ExamDate": exam_dates,
"TeacherID": teacher_ids
})
#
Display first 5 rows
oltp_data.head()
✅ This will create 500 random OLTP
entries for students.
Step
4: Connect to a SQL Database
You
can use SQLite (simpler) or MySQL (optional).
Option
A: SQLite
#
Create SQLite engine
engine
= create_engine('sqlite:///datawarehouse_lab.db')
Option
B: MySQL (optional)
#
Create MySQL engine (change username/password/DB)
#
engine =
create_engine('mysql+mysqlconnector://root:password@localhost/datawarehouse_lab')
Step
5: Create OLTP Table in SQL
#
Save OLTP data to SQL table
oltp_data.to_sql('Student_OLTP',
engine, if_exists='replace', index=False)
print("OLTP
table created successfully!")
This
will create a table Student_OLTP with all 500 records.
In
SQL terms, it’s equivalent to:
CREATE
TABLE Student_OLTP (
StudentID INT,
Subject VARCHAR(50),
Marks INT,
ExamDate DATE,
TeacherID INT
);
--
Insert transactional data
INSERT
INTO Student_OLTP (StudentID, Subject, Marks, ExamDate, TeacherID)
VALUES
(1, 'Math', 85, '2024-02-10', 3);
--
Repeat for 500 entries (pandas handles automatically)
Step
6: Create Aggregated DW Table in SQL
We
aggregate OLTP data to build a DW table:
#
Aggregate: Average marks per student
dw_data
= oltp_data.groupby('StudentID', as_index=False)['Marks'].mean()
dw_data.columns
= ['StudentID', 'Avg_Marks']
#
Aggregate: Total marks per subject
dw_subject
= oltp_data.groupby('Subject', as_index=False)['Marks'].sum()
dw_subject.columns
= ['Subject', 'Total_Marks']
#
Save DW tables to SQL
dw_data.to_sql('Student_DW',
engine, if_exists='replace', index=False)
dw_subject.to_sql('Subject_DW',
engine, if_exists='replace', index=False)
print("DW
tables created successfully!")
Equivalent
SQL query for aggregation:
--
Average marks per student
CREATE
TABLE Student_DW AS
SELECT
StudentID, AVG(Marks) AS Avg_Marks
FROM
Student_OLTP
GROUP
BY StudentID;
--
Total marks per subject
CREATE
TABLE Subject_DW AS
SELECT
Subject, SUM(Marks) AS Total_Marks
FROM
Student_OLTP
GROUP
BY Subject;
Step
7: Verify Tables
#
View first 5 rows of OLTP
print("OLTP
Table:")
print(pd.read_sql('SELECT
* FROM Student_OLTP LIMIT 5', engine))
#
View first 5 rows of DW
print("\nDW
Table (Average Marks per Student):")
print(pd.read_sql('SELECT
* FROM Student_DW LIMIT 5', engine))
print("\nDW
Table (Total Marks per Subject):")
print(pd.read_sql('SELECT
* FROM Subject_DW', engine))
Step
8: Optional Visualization
import
matplotlib.pyplot as plt
#
Bar chart for average marks per student
plt.figure(figsize=(10,5))
plt.bar(dw_data['StudentID'],
dw_data['Avg_Marks'], color='skyblue')
plt.xlabel('StudentID')
plt.ylabel('Average
Marks')
plt.title('Average
Marks per Student (DW)')
plt.show()
Step
9: Deliverables
Screenshots
of Jupyter Notebook showing:
OLTP
table (Student_OLTP)
DW
tables (Student_DW & Subject_DW)
CSV
files (optional):
oltp_data.to_csv('Student_OLTP.csv',
index=False)
dw_data.to_csv('Student_DW.csv',
index=False)
dw_subject.to_csv('Subject_DW.csv',
index=False)
2.1
Draw
Kimball Lifecycle using MS Excel / Lucidchart / Python (matplotlib).
Step
1: Open Jupyter Notebook
Launch
Jupyter Notebook and create a new Python 3 notebook.
Step
2: Import Required Libraries
We
will use matplotlib for drawing.
import
matplotlib.pyplot as plt
from
matplotlib.patches import FancyArrowPatch, Rectangle
Step
3: Define the Kimball Lifecycle Steps
Kimball
Lifecycle typically includes these steps:
Business
Requirements
Choose
the Grain
Design
the Dimension Tables
Design
the Fact Tables
ETL
Process
Deploy
/ Maintain Data Warehouse
We
will visualize this as a flow diagram.
#
Define lifecycle steps
steps
= [
"Business Requirements",
"Choose the Grain",
"Design Dimension Tables",
"Design Fact Tables",
"ETL Process",
"Deploy / Maintain DW"
]
#
Position of steps on X-axis
x_pos
= [1, 2, 3, 4, 5, 6]
y_pos
= [1]*6
Step
4: Create Diagram Using Matplotlib
#
Create a figure
plt.figure(figsize=(14,3))
ax
= plt.gca()
#
Draw rectangles for each step
for
x, step in zip(x_pos, steps):
rect = Rectangle((x-0.5, 0.8), 1, 0.4,
facecolor='skyblue', edgecolor='black')
ax.add_patch(rect)
plt.text(x, 1, step, ha='center',
va='center', fontsize=10)
#
Draw arrows between steps
for
i in range(len(steps)-1):
arrow = FancyArrowPatch((x_pos[i]+0.5, 1),
(x_pos[i+1]-0.5, 1),
arrowstyle='->',
mutation_scale=20, color='black')
ax.add_patch(arrow)
#
Set limits and remove axes
plt.xlim(0,
7)
plt.ylim(0,
2)
plt.axis('off')
plt.title("Kimball
Lifecycle Diagram")
plt.show()
✅ This will produce a flow diagram
with rectangles for each step and arrows showing the process.
Step
5: Optional Excel Implementation
Open
MS Excel → Insert → Shapes → Rectangles & Arrows.
Draw
6 rectangles horizontally.
Write
each Kimball Lifecycle step in a rectangle.
Connect
them with right arrows.
Save
or export as an image for your report.
Step
6: Optional Lucidchart Implementation
Open
Lucidchart → New Document → Flowchart Template.
Use
rectangles for steps and arrows to show flow.
Label
the steps as listed above.
Export
as PNG/PDF to include in your assignment.
Deliverables
Python-generated
diagram (matplotlib figure).
(Optional)
Excel or Lucidchart version of the diagram.
Add
a short note:
Kimball
Lifecycle is an iterative process for building data warehouses.
Helps
in designing dimensions, facts, and ETL process systematically.
2.2
Create
a mock requirement document for a Retail Store Data Warehouse.
Step 1: Open Jupyter Notebook
Launch
Jupyter Notebook → Create a New Python 3 notebook.
Step
2: Import Required Libraries
We
will use pandas to create and save the mock requirement document.
import
pandas as pd
Step
3: Define the Requirements
A
mock requirement document typically contains:
|
Feature |
Description |
Priority |
Notes |
|
Sales
Data |
Daily
sales transactions per store |
High |
Include
transaction ID, date, product ID, quantity, amount |
|
Customer
Data |
Customer
information |
Medium |
Customer
ID, name, gender, age, loyalty points |
|
Product
Data |
Product
master data |
High |
Product
ID, name, category, price, supplier |
|
Inventory
Data |
Stock
levels |
High |
Store
ID, product ID, current stock, reorder level |
|
Employee
Data |
Store
staff details |
Low |
Employee
ID, name, role, store ID |
|
Reports |
Sales
summary, product performance, customer analytics |
High |
Weekly,
monthly reports for management decisions |
We
can create this in Python as a DataFrame.
#
Create mock requirement document
requirements
= pd.DataFrame({
"Feature": [
"Sales Data",
"Customer Data",
"Product Data",
"Inventory Data",
"Employee Data",
"Reports"
],
"Description": [
"Daily sales transactions per
store",
"Customer information",
"Product master data",
"Stock levels",
"Store staff details",
"Sales summary, product
performance, customer analytics"
],
"Priority": [
"High",
"Medium",
"High",
"High",
"Low",
"High"
],
"Notes": [
"Include transaction ID, date,
product ID, quantity, amount",
"Customer ID, name, gender, age,
loyalty points",
"Product ID, name, category,
price, supplier",
"Store ID, product ID, current
stock, reorder level",
"Employee ID, name, role, store
ID",
"Weekly, monthly reports for
management decisions"
]
})
#
Display the DataFrame
requirements
✅ Running this will show the mock
requirement table in your notebook.
Step
4: Save the Requirement Document
You
can save it as CSV or Excel.
#
Save as CSV
requirements.to_csv("Retail_DW_Requirement.csv",
index=False)
#
Save as Excel
requirements.to_excel("Retail_DW_Requirement.xlsx",
index=False)
Now
you have a ready-to-submit requirement document.
Step
5: Optional Notes
This
document can be expanded with data sources, ETL rules, fact and dimension
tables, KPIs.
For
teaching purposes, keep it simple and readable.
Deliverables
Displayed
mock requirement table in Jupyter Notebook.
CSV
file: Retail_DW_Requirement.csv.
Excel
file: Retail_DW_Requirement.xlsx.
Optional
short note explaining why each feature is needed in a Retail DW.
2.3
Identify
roles and assign responsibilities in a team (example: project manager, ETL
developer, analyst).
Step
1: Open Jupyter Notebook
Launch
Jupyter Notebook → Create a New Python 3 notebook.
Step
2: Import Required Library
We
will use pandas to create and display the team roles table.
import
pandas as pd
Step
3: Define Team Roles and Responsibilities
A
typical Data Warehouse team might have the following roles:
|
Role |
Responsibility |
Skills
Required |
Priority |
|
Project
Manager |
Plan
and manage the DW project, track progress, communicate with stakeholders |
Project
management, communication |
High |
|
Data
Architect |
Design
the DW schema, select ETL tools, ensure data quality |
Data
modeling, SQL, ETL design |
High |
|
ETL
Developer |
Extract,
Transform, Load data from sources to DW |
SQL,
Python, ETL tools, data cleansing |
High |
|
BI
Analyst |
Create
reports, dashboards, and analytics for business users |
SQL,
Power BI/Tableau, Excel, analytics |
Medium |
|
Data
Analyst |
Analyze
data, validate ETL results, support reporting |
SQL,
Python/R, Excel, statistics |
Medium |
|
DBA |
Manage
DW database, optimize performance, ensure backups |
SQL,
database tuning, security |
High |
|
QA/Tester |
Test
ETL jobs and reports for accuracy |
SQL,
testing tools, attention to detail |
Medium |
We
can create this table in Python as a DataFrame.
#
Create DW Team Roles and Responsibilities
team_roles
= pd.DataFrame({
"Role": [
"Project Manager",
"Data Architect",
"ETL Developer",
"BI Analyst",
"Data Analyst",
"DBA",
"QA/Tester"
],
"Responsibility": [
"Plan and manage the DW project,
track progress, communicate with stakeholders",
"Design the DW schema, select ETL
tools, ensure data quality",
"Extract, Transform, Load data
from sources to DW",
"Create reports, dashboards, and
analytics for business users",
"Analyze data, validate ETL
results, support reporting",
"Manage DW database, optimize
performance, ensure backups",
"Test ETL jobs and reports for
accuracy"
],
"Skills_Required": [
"Project management,
communication",
"Data modeling, SQL, ETL
design",
"SQL, Python, ETL tools, data
cleansing",
"SQL, Power BI/Tableau, Excel,
analytics",
"SQL, Python/R, Excel,
statistics",
"SQL, database tuning,
security",
"SQL, testing tools, attention to
detail"
],
"Priority": [
"High",
"High",
"High",
"Medium",
"Medium",
"High",
"Medium"
]
})
#
Display the DataFrame
team_roles
✅ Running this will show the team
roles table in your notebook.
Step
4: Save the Team Roles Document
You
can save it as CSV or Excel for submission.
#
Save as CSV
team_roles.to_csv("DW_Team_Roles.csv",
index=False)
#
Save as Excel
team_roles.to_excel("DW_Team_Roles.xlsx",
index=False)
Step
5: Optional Notes
Add
additional roles like Data Governance Lead, Metadata Manager, or Security
Analyst if required.
Priority
shows how critical the role is to the DW project.
Deliverables
Displayed
team roles table in Jupyter Notebook.
CSV
file: DW_Team_Roles.csv.
Excel
file: DW_Team_Roles.xlsx.
Optional
short note explaining why each role is important.
3.1
Create
a Star Schema for Sales Data using SQL.
Correct
Step-by-Step Solution
Step
1: Import Libraries
import
pandas as pd
from
sqlalchemy import create_engine, text
Step
2: Create Engine
engine
= create_engine("sqlite:///sales_dw.db", echo=True)
Step
3: Execute SQL via a Connection
You
now need to open a connection and use conn.execute(text("SQL
QUERY")).
with
engine.connect() as conn:
# Product Dimension
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Dim_Product
(
ProductID INTEGER PRIMARY KEY,
ProductName TEXT,
Category TEXT,
Price REAL
)
"""))
# Customer Dimension
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Dim_Customer
(
CustomerID INTEGER PRIMARY KEY,
CustomerName TEXT,
City TEXT,
State TEXT
)
"""))
# Store Dimension
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Dim_Store (
StoreID INTEGER PRIMARY KEY,
StoreName TEXT,
City TEXT,
State TEXT
)
"""))
# Date Dimension
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Dim_Date (
DateID INTEGER PRIMARY KEY,
Date TEXT,
Month INTEGER,
Quarter INTEGER,
Year INTEGER
)
"""))
# Fact Table
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Fact_Sales (
SalesID INTEGER PRIMARY KEY,
DateID INTEGER,
ProductID INTEGER,
CustomerID INTEGER,
StoreID INTEGER,
Quantity INTEGER,
TotalAmount REAL,
FOREIGN KEY(DateID) REFERENCES
Dim_Date(DateID),
FOREIGN KEY(ProductID) REFERENCES
Dim_Product(ProductID),
FOREIGN KEY(CustomerID) REFERENCES
Dim_Customer(CustomerID),
FOREIGN KEY(StoreID) REFERENCES
Dim_Store(StoreID)
)
"""))
Step
4: Insert Sample Data
with
engine.connect() as conn:
# Insert sample data into Product Dimension
conn.execute(text("""
INSERT INTO Dim_Product (ProductID,
ProductName, Category, Price)
VALUES
(1, 'Laptop', 'Electronics', 500),
(2, 'Phone', 'Electronics', 300),
(3, 'Chair', 'Furniture', 50)
"""))
# Insert sample data into Customer
Dimension
conn.execute(text("""
INSERT INTO Dim_Customer (CustomerID,
CustomerName, City, State)
VALUES
(1, 'Alice', 'Pune', 'MH'),
(2, 'Bob', 'Mumbai', 'MH')
"""))
# Insert sample data into Store Dimension
conn.execute(text("""
INSERT INTO Dim_Store (StoreID,
StoreName, City, State)
VALUES
(1, 'Store A', 'Pune', 'MH'),
(2, 'Store B', 'Mumbai', 'MH')
"""))
# Insert sample data into Date Dimension
conn.execute(text("""
INSERT INTO Dim_Date (DateID, Date,
Month, Quarter, Year)
VALUES
(1, '2025-09-01', 9, 3, 2025),
(2, '2025-09-02', 9, 3, 2025)
"""))
# Insert sample data into Fact_Sales
conn.execute(text("""
INSERT INTO Fact_Sales (SalesID,
DateID, ProductID, CustomerID, StoreID, Quantity, TotalAmount)
VALUES
(1, 1, 1, 1, 1, 2, 1000),
(2, 2, 2, 2, 2, 1, 300),
(3, 1, 3, 1, 1, 4, 200)
"""))
Step
5: Query the Fact Table
with
engine.connect() as conn:
df = pd.read_sql("""
SELECT f.SalesID, d.Date,
p.ProductName, c.CustomerName, s.StoreName, f.Quantity, f.TotalAmount
FROM Fact_Sales f
JOIN Dim_Date d ON f.DateID = d.DateID
JOIN Dim_Product p ON f.ProductID =
p.ProductID
JOIN Dim_Customer c ON f.CustomerID =
c.CustomerID
JOIN Dim_Store s ON f.StoreID =
s.StoreID
""", conn)
df
✅ Now everything will work in
SQLAlchemy 2.x without engine.execute() errors.
3.2
Create
a Snowflake Schema for University Data
Step
1: Import Libraries
import
pandas as pd
from
sqlalchemy import create_engine, text
Step
2: Create Database Engine
#
SQLite database (it will create 'university_dw.db' in your folder)
engine
= create_engine("sqlite:///university_dw.db", echo=True)
Step
3: Create Dimension and Fact Tables (Snowflake Schema)
In
a snowflake schema, dimensions are normalized, so for example, Dim_Student
might link to Dim_Department.
with
engine.connect() as conn:
# Department Dimension
conn.execute(text("""
CREATE TABLE IF NOT EXISTS
Dim_Department (
DeptID INTEGER PRIMARY KEY,
DeptName TEXT
)
"""))
# Course Dimension
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Dim_Course (
CourseID INTEGER PRIMARY KEY,
CourseName TEXT,
DeptID INTEGER,
FOREIGN KEY(DeptID) REFERENCES
Dim_Department(DeptID)
)
"""))
# Student Dimension
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Dim_Student
(
StudentID INTEGER PRIMARY KEY,
StudentName TEXT,
DeptID INTEGER,
FOREIGN KEY(DeptID) REFERENCES
Dim_Department(DeptID)
)
"""))
# Date Dimension
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Dim_Date (
DateID INTEGER PRIMARY KEY,
Date TEXT,
Month INTEGER,
Quarter INTEGER,
Year INTEGER
)
"""))
# Fact Table (Grades / Enrollment)
conn.execute(text("""
CREATE TABLE IF NOT EXISTS
Fact_Enrollment (
EnrollmentID INTEGER PRIMARY KEY,
StudentID INTEGER,
CourseID INTEGER,
DateID INTEGER,
Grade TEXT,
FOREIGN KEY(StudentID) REFERENCES
Dim_Student(StudentID),
FOREIGN KEY(CourseID) REFERENCES
Dim_Course(CourseID),
FOREIGN KEY(DateID) REFERENCES
Dim_Date(DateID)
)
"""))
Step
4: Insert Sample Data
with
engine.connect() as conn:
# Departments
conn.execute(text("""
INSERT INTO Dim_Department (DeptID,
DeptName) VALUES
(1, 'Computer Science'),
(2, 'Mathematics'),
(3, 'Physics')
"""))
# Courses
conn.execute(text("""
INSERT INTO Dim_Course (CourseID,
CourseName, DeptID) VALUES
(1, 'Data Structures', 1),
(2, 'Algorithms', 1),
(3, 'Calculus', 2),
(4, 'Linear Algebra', 2),
(5, 'Quantum Mechanics', 3)
"""))
# Students
conn.execute(text("""
INSERT INTO Dim_Student (StudentID,
StudentName, DeptID) VALUES
(1, 'Alice', 1),
(2, 'Bob', 2),
(3, 'Charlie', 3)
"""))
# Dates
conn.execute(text("""
INSERT INTO Dim_Date (DateID, Date,
Month, Quarter, Year) VALUES
(1, '2025-09-01', 9, 3, 2025),
(2, '2025-09-02', 9, 3, 2025)
"""))
# Fact Table
conn.execute(text("""
INSERT INTO Fact_Enrollment
(EnrollmentID, StudentID, CourseID, DateID, Grade) VALUES
(1, 1, 1, 1, 'A'),
(2, 1, 2, 2, 'B'),
(3, 2, 3, 1, 'A'),
(4, 3, 5, 2, 'B')
"""))
Step
5: Query the Fact Table with Joins
with
engine.connect() as conn:
df = pd.read_sql("""
SELECT f.EnrollmentID, s.StudentName,
d.DeptName, c.CourseName, dt.Date, f.Grade
FROM Fact_Enrollment f
JOIN Dim_Student s ON f.StudentID =
s.StudentID
JOIN Dim_Course c ON f.CourseID =
c.CourseID
JOIN Dim_Department d ON s.DeptID =
d.DeptID
JOIN Dim_Date dt ON f.DateID =
dt.DateID
""", conn)
df
✅ Output will show Enrollment
details with student, department, course, date, and grade, which is a typical
Snowflake Schema view.
3.3
Create
a Fact Constellation Schema (Galaxy Schema) for Retail & Inventory.
Step
1: Import Libraries
import
pandas as pd
from
sqlalchemy import create_engine, text
Step
2: Create Database Engine
engine
= create_engine("sqlite:///retail_inventory_dw.db", echo=True)
Step
3: Create Dimension Tables
with
engine.connect() as conn:
# Product Dimension
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Dim_Product
(
ProductID INTEGER PRIMARY KEY,
ProductName TEXT,
Category TEXT,
Price REAL
)
"""))
# Store Dimension
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Dim_Store (
StoreID INTEGER PRIMARY KEY,
StoreName TEXT,
Location TEXT
)
"""))
# Date Dimension
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Dim_Date (
DateID INTEGER PRIMARY KEY,
Date TEXT,
Month INTEGER,
Quarter INTEGER,
Year INTEGER
)
"""))
Step
4: Create Fact Tables
with
engine.connect() as conn:
# Fact Table: Sales
conn.execute(text("""
CREATE TABLE IF NOT EXISTS Fact_Sales (
SalesID INTEGER PRIMARY KEY,
ProductID INTEGER,
StoreID INTEGER,
DateID INTEGER,
QuantitySold INTEGER,
TotalAmount REAL,
FOREIGN KEY(ProductID) REFERENCES
Dim_Product(ProductID),
FOREIGN KEY(StoreID) REFERENCES
Dim_Store(StoreID),
FOREIGN KEY(DateID) REFERENCES
Dim_Date(DateID)
)
"""))
# Fact Table: Inventory
conn.execute(text("""
CREATE TABLE IF NOT EXISTS
Fact_Inventory (
InventoryID INTEGER PRIMARY KEY,
ProductID INTEGER,
StoreID INTEGER,
DateID INTEGER,
StockIn INTEGER,
StockOut INTEGER,
FOREIGN KEY(ProductID) REFERENCES
Dim_Product(ProductID),
FOREIGN KEY(StoreID) REFERENCES
Dim_Store(StoreID),
FOREIGN KEY(DateID) REFERENCES
Dim_Date(DateID)
)
"""))
Step
5: Insert Sample Data
with
engine.connect() as conn:
# Products
conn.execute(text("""
INSERT INTO Dim_Product (ProductID,
ProductName, Category, Price) VALUES
(1, 'Laptop', 'Electronics', 50000),
(2, 'Mobile', 'Electronics', 15000),
(3, 'Shirt', 'Clothing', 800),
(4, 'Jeans', 'Clothing', 1200)
"""))
# Stores
conn.execute(text("""
INSERT INTO Dim_Store (StoreID,
StoreName, Location) VALUES
(1, 'Store A', 'Pune'),
(2, 'Store B', 'Mumbai')
"""))
# Dates
conn.execute(text("""
INSERT INTO Dim_Date (DateID, Date,
Month, Quarter, Year) VALUES
(1, '2025-09-01', 9, 3, 2025),
(2, '2025-09-02', 9, 3, 2025)
"""))
# Fact_Sales
conn.execute(text("""
INSERT INTO Fact_Sales (SalesID,
ProductID, StoreID, DateID, QuantitySold, TotalAmount) VALUES
(1, 1, 1, 1, 5, 250000),
(2, 2, 1, 2, 10, 150000),
(3, 3, 2, 1, 20, 16000)
"""))
# Fact_Inventory
conn.execute(text("""
INSERT INTO Fact_Inventory
(InventoryID, ProductID, StoreID, DateID, StockIn, StockOut) VALUES
(1, 1, 1, 1, 10, 5),
(2, 2, 1, 2, 20, 10),
(3, 3, 2, 1, 30, 20)
"""))
Step
6: Query Fact Tables with Joins
with
engine.connect() as conn:
df_sales = pd.read_sql("""
SELECT f.SalesID, p.ProductName, s.StoreName,
d.Date, f.QuantitySold, f.TotalAmount
FROM Fact_Sales f
JOIN Dim_Product p ON f.ProductID =
p.ProductID
JOIN Dim_Store s ON f.StoreID =
s.StoreID
JOIN Dim_Date d ON f.DateID = d.DateID
""", conn)
df_inventory =
pd.read_sql("""
SELECT f.InventoryID, p.ProductName,
s.StoreName, d.Date, f.StockIn, f.StockOut
FROM Fact_Inventory f
JOIN Dim_Product p ON f.ProductID =
p.ProductID
JOIN Dim_Store s ON f.StoreID =
s.StoreID
JOIN Dim_Date d ON f.DateID = d.DateID
""", conn)
print("Sales
Fact Table")
print(df_sales)
print("\nInventory
Fact Table")
print(df_inventory)
✅ Explanation:
Fact_Sales
and Fact_Inventory are two separate fact tables.
Dim_Product,
Dim_Store, Dim_Date are shared dimension tables.
This
structure represents a Galaxy Schema because multiple fact tables share common
dimensions.
4.1
Write
a Python program using Pandas to perform ETL (read CSV → clean data → load into
DB).
pip
install pandas sqlalchemy sqlite3
import
pandas as pd
from
io import StringIO
#
-------------------
#
1. Extract (Read CSV)
#
-------------------
csv_data
= """id,first_name,last_name,age,join_date,salary
1,Alice,Smith,34,2021/01/05,50000
2,Bob,Jones,,05-02-2021,60000
3,carol," O'Neil",29,2021.03.10,not available
3,carol," O'Neil",29,2021-03-10,70000
4,,Khan,150,2021-13-01,80000
5,Eve,Stone,27,March
2 2021,65000
"""
#
Read CSV into DataFrame
df
= pd.read_csv(StringIO(csv_data))
print("🔹 Raw Data:")
print(df,
"\n")
#
-------------------
#
2. Transform (Clean Data)
#
-------------------
#
Strip whitespace & fix capitalization
df['first_name']
= df['first_name'].str.strip().str.capitalize()
df['last_name']
= df['last_name'].str.strip()
#
Fill missing first names
df['first_name']
= df['first_name'].fillna("Unknown")
#
Fix ages (invalid -> NaN)
df['age']
= pd.to_numeric(df['age'], errors='coerce')
df.loc[(df['age']
< 0) | (df['age'] > 100), 'age'] = None
#
Standardize join_date
df['join_date']
= pd.to_datetime(df['join_date'], errors='coerce')
#
Fix salary (convert to numeric, invalid -> NaN)
df['salary']
= pd.to_numeric(df['salary'], errors='coerce')
#
Remove duplicates
df
= df.drop_duplicates()
#
-------------------
#
3. Load (Save Cleaned Data)
#
-------------------
df.to_csv("cleaned_data.csv",
index=False)
print("✅ Cleaned Data:")
print(df)
import
pandas as pd
from
sqlalchemy import create_engine
#
Step 1: Extract - Read CSV using full path
csv_path
= r"C:\Users\Nikhil\Downloads\sample_data.csv"
db_path
= r"sqlite:///C:\Users\Nikhil\Downloads\etl_example.db"
df
= pd.read_csv(csv_path)
print("Original
Data:")
print(df)
#
Step 2: Transform - Clean data
#
Strip spaces
df
= df.applymap(lambda x: x.strip() if isinstance(x, str) else x)
#
Convert numeric
df['age']
= pd.to_numeric(df['age'], errors='coerce')
df['salary']
= pd.to_numeric(df['salary'], errors='coerce')
#
Convert dates
df['join_date']
= pd.to_datetime(df['join_date'], errors='coerce')
#
Deduplicate (keep last row for same id)
df
= df.drop_duplicates(subset=['id'], keep='last')
#
Handle invalid ages
df.loc[(df['age']
< 0) | (df['age'] > 120), 'age'] = None
#
Fill missing values
df['age']
= df['age'].fillna(df['age'].median())
df['salary']
= df['salary'].fillna(df['salary'].median())
df['first_name']
= df['first_name'].fillna("Unknown")
#
Create derived column
df['full_name']
= df['first_name'] + " " + df['last_name']
print("\nCleaned
Data:")
print(df)
#
Step 3: Load - Save into SQLite using full path
engine
= create_engine(db_path)
df.to_sql("people",
con=engine, if_exists="replace", index=False)
#
Verify
loaded
= pd.read_sql("SELECT * FROM people", engine)
print("\nData
Loaded from DB:")
print(loaded)
cd
C:\Users\Nikhil\Downloads
pip
install psycopg2-binary
conda
install psycopg2
import
sqlite3
conn
= sqlite3.connect("etl_example.db")
rows
= conn.execute("SELECT * FROM people").fetchall()
print(rows)
conn.close()
4.2
Use
SQL to perform data aggregation (ROLAP style queries). Example: ROLLUP, CUBE.
from
sqlalchemy import create_engine, text
#
connect to SQLite
engine
=
create_engine(r"sqlite:///C:\Users\Nikhil\Downloads\etl_example.db")
#
create table with SQL
with
engine.connect() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS sales (
id INTEGER PRIMARY KEY,
product TEXT,
quantity INTEGER,
price REAL
)
"""))
conn.commit()
print("✅ Table 'sales' created successfully!")
import
pandas as pd
from
sqlalchemy import create_engine, text
#
Connect to SQLite
engine
= create_engine("sqlite:///mydb.sqlite")
#
Step 1: Create 'sales' table (only if not exists)
with
engine.connect() as conn:
conn.execute(text("""
CREATE TABLE IF NOT EXISTS sales (
id INTEGER PRIMARY KEY,
product TEXT,
year INTEGER,
amount REAL,
state TEXT
);
"""))
#
Step 2: Insert some sample rows (only run once, or wrap in try/except)
data
= [
(1, "Laptop", 2021, 1200.50,
"Maharashtra"),
(2, "Phone", 2021, 800.00,
"Maharashtra"),
(3, "Tablet", 2022, 600.00,
"Gujarat"),
(4, "Laptop", 2022, 1400.00,
"Gujarat"),
(5, "Phone", 2023, 900.00,
"Maharashtra"),
]
df
= pd.DataFrame(data, columns=["id", "product",
"year", "amount", "state"])
df.to_sql("sales",
engine, if_exists="append", index=False)
#
Step 3: Run aggregation query
query
= """
SELECT
state, year, SUM(amount) AS total_sales
FROM
sales
GROUP
BY state, year;
"""
with
engine.connect() as conn:
result = pd.read_sql(text(query), conn)
print("\n✅ Aggregated Sales Data:")
print(result)
import
pandas as pd
from
sqlalchemy import create_engine
#
Connect to SQLite (creates mydb.sqlite if not exists)
engine
= create_engine("sqlite:///mydb.sqlite")
#
Create sample sales data
data
= [
("North", 2021, 1000),
("North", 2022, 1500),
("South", 2021, 2000),
("South", 2022, 2500),
("East", 2021, 1200),
("East", 2022, 1700),
]
df_sales
= pd.DataFrame(data, columns=["region", "year",
"amount"])
#
✅ Correct use of 'with'
with
engine.begin() as conn:
df_sales.to_sql("sales", conn,
if_exists="replace", index=False)
print("✅ Sales table created successfully!")
from
sqlalchemy import text
query
= text("""
--
Region + Year totals
SELECT
region, year, SUM(amount) AS total_sales
FROM
sales
GROUP
BY region, year
UNION
ALL
--
Region totals only
SELECT
region, NULL AS year, SUM(amount) AS total_sales
FROM
sales
GROUP
BY region
UNION
ALL
--
Year totals only
SELECT
NULL AS region, year, SUM(amount) AS total_sales
FROM
sales
GROUP
BY year
UNION
ALL
--
Grand total
SELECT
NULL AS region, NULL AS year, SUM(amount) AS total_sales
FROM
sales;
""")
with
engine.connect() as conn:
result = pd.read_sql(query, conn)
print(result)
0 Comments