Data Warehouse Practical Jyupter

 



1.1

 

Create a table to compare OLTP vs Data Warehouse in SQL (using comments/notes).

 

Step 1: Import libraries and create engine

import pandas as pd

from sqlalchemy import create_engine, text

 

# Create in-memory SQLite database

engine = create_engine('sqlite://', echo=False)

 

Step 2: Create the table

create_table_query = """

CREATE TABLE OLTP_vs_DW (

    Feature TEXT,

    OLTP_Description TEXT,

    DW_Description TEXT

);

"""

 

# Use a connection to execute

with engine.begin() as conn:

    conn.execute(text(create_table_query))

 

Step 3: Insert data

insert_data_query = """

INSERT INTO OLTP_vs_DW (Feature, OLTP_Description, DW_Description)

VALUES

('Purpose', 'Handles day-to-day transactions', 'Supports analytics and decision-making'),

('Data Orientation', 'Transaction-oriented', 'Subject-oriented / Analysis-oriented'),

('Data Structure', 'Normalized tables to reduce redundancy', 'Denormalized tables to support fast queries'),

('Data Volume', 'High volume of detailed records', 'Aggregated / summarized data, smaller volume'),

('Update Frequency', 'Real-time / frequent updates', 'Periodic updates (daily, weekly, monthly)'),

('Usage', 'Daily operations (insert, update, delete)', 'Reporting, trend analysis, business intelligence'),

('Examples', 'Banking transactions, Retail POS', 'Sales summaries, Customer behavior reports');

"""

 

with engine.begin() as conn:

    conn.execute(text(insert_data_query))

 

Step 4: Read and display the table

df_compare = pd.read_sql('SELECT * FROM OLTP_vs_DW', engine)

df_compare

 

Step 5 (Optional): Save to CSV
df_compare.to_csv('OLTP_vs_DW_table.csv', index=False)

 

 

 

Step 1: Open Jupyter Notebook

Launch Anaconda Navigator → Click Launch under Jupyter Notebook.

Or open a terminal and type:

jupyter notebook

A browser window will open with the Jupyter Dashboard.

Click New → Python 3 to create a new notebook.


Step 2: Import the required library

We need pandas to create and manipulate tables.
In the first cell, type:

import pandas as pd

Run the cell with Shift + Enter.


Step 3: Create the OLTP vs DW table

We use a pandas DataFrame to store the comparison.

# Create a dictionary with table data

data = {

    "Feature": [

        "Purpose", "Data Orientation", "Data Structure",

        "Data Volume", "Update Frequency", "Usage", "Examples"

    ],

    "OLTP_Description": [

        "Handles day-to-day transactions",

        "Transaction-oriented",

        "Normalized tables to reduce redundancy",

        "High volume of detailed records",

        "Real-time / frequent updates",

        "Daily operations (insert, update, delete)",

        "Banking transactions, Retail POS"

    ],

    "DW_Description": [

        "Supports analytics and decision-making",

        "Subject-oriented / Analysis-oriented",

        "Denormalized tables to support fast queries",

        "Aggregated / summarized data, smaller volume",

        "Periodic updates (daily, weekly, monthly)",

        "Reporting, trend analysis, business intelligence",

        "Sales summaries, Customer behavior reports"

    ]

}

 

# Create DataFrame

df_compare = pd.DataFrame(data)

 

# Display the table

df_compare

Run the cell (Shift + Enter).

You will see a table with three columns: Feature, OLTP_Description, and DW_Description.

 


Step 4: Save the table as CSV

df_compare.to_csv('OLTP_vs_DW_table.csv', index=False)

This saves the table as a CSV file named OLTP_vs_DW_table.csv in your notebook working directory.

You can download it from Jupyter Dashboard for submission.


Step 5: Optional MySQL load

If you want to put this table into MySQL:

from sqlalchemy import create_engine

 

# Change username/password and database as per your MySQL setup

engine_mysql = create_engine('mysql+mysqlconnector://root:password@localhost/datawarehouse_lab')

 

df_compare.to_sql('OLTP_vs_DW', engine_mysql, if_exists='replace', index=False)

Make sure MySQL server is running and database datawarehouse_lab exists.

This will create a table named OLTP_vs_DW in your MySQL database.


Step 6: Take screenshots

Jupyter Notebook showing the table output.

CSV file saved in working directory.

Optional: MySQL Workbench showing the table.


 

 

 

1.2

 

Write SQL queries to insert transactional data (OLTP) vs aggregated queries for Data Warehouse.

 

 

Step 1: Open Jupyter Notebook

Launch Anaconda Navigator → Jupyter Notebook → New → Python 3.

Create a new notebook for this lab.


Step 2: Import Libraries

We need pandas and SQLAlchemy for SQL operations:

import pandas as pd

from sqlalchemy import create_engine

Run this cell with Shift + Enter.


Step 3: Create Sample OLTP Data in Python

We will create a transactional table for students’ marks:

import numpy as np

import random

from datetime import datetime, timedelta

 

# Generate OLTP dataset

num_entries = 500

student_ids = np.random.randint(1, 51, size=num_entries)

subjects = np.random.choice(['Math','Science','English','History'], size=num_entries)

marks = np.random.randint(35, 101, size=num_entries)

start_date = datetime(2024, 1, 1)

exam_dates = [start_date + timedelta(days=random.randint(0, 365)) for _ in range(num_entries)]

teacher_ids = np.random.randint(1, 11, size=num_entries)

 

oltp_data = pd.DataFrame({

    "StudentID": student_ids,

    "Subject": subjects,

    "Marks": marks,

    "ExamDate": exam_dates,

    "TeacherID": teacher_ids

})

 

# Display first 5 rows

oltp_data.head()

This will create 500 random OLTP entries for students.


Step 4: Connect to a SQL Database

You can use SQLite (simpler) or MySQL (optional).

Option A: SQLite

# Create SQLite engine

engine = create_engine('sqlite:///datawarehouse_lab.db')

Option B: MySQL (optional)

# Create MySQL engine (change username/password/DB)

# engine = create_engine('mysql+mysqlconnector://root:password@localhost/datawarehouse_lab')


Step 5: Create OLTP Table in SQL

# Save OLTP data to SQL table

oltp_data.to_sql('Student_OLTP', engine, if_exists='replace', index=False)

print("OLTP table created successfully!")

This will create a table Student_OLTP with all 500 records.

In SQL terms, it’s equivalent to:

CREATE TABLE Student_OLTP (

    StudentID INT,

    Subject VARCHAR(50),

    Marks INT,

    ExamDate DATE,

    TeacherID INT

);

 

-- Insert transactional data

INSERT INTO Student_OLTP (StudentID, Subject, Marks, ExamDate, TeacherID)

VALUES (1, 'Math', 85, '2024-02-10', 3);

-- Repeat for 500 entries (pandas handles automatically)


Step 6: Create Aggregated DW Table in SQL

We aggregate OLTP data to build a DW table:

# Aggregate: Average marks per student

dw_data = oltp_data.groupby('StudentID', as_index=False)['Marks'].mean()

dw_data.columns = ['StudentID', 'Avg_Marks']

 

# Aggregate: Total marks per subject

dw_subject = oltp_data.groupby('Subject', as_index=False)['Marks'].sum()

dw_subject.columns = ['Subject', 'Total_Marks']

 

# Save DW tables to SQL

dw_data.to_sql('Student_DW', engine, if_exists='replace', index=False)

dw_subject.to_sql('Subject_DW', engine, if_exists='replace', index=False)

print("DW tables created successfully!")

Equivalent SQL query for aggregation:

-- Average marks per student

CREATE TABLE Student_DW AS

SELECT StudentID, AVG(Marks) AS Avg_Marks

FROM Student_OLTP

GROUP BY StudentID;

 

-- Total marks per subject

CREATE TABLE Subject_DW AS

SELECT Subject, SUM(Marks) AS Total_Marks

FROM Student_OLTP

GROUP BY Subject;


Step 7: Verify Tables

# View first 5 rows of OLTP

print("OLTP Table:")

print(pd.read_sql('SELECT * FROM Student_OLTP LIMIT 5', engine))

 

# View first 5 rows of DW

print("\nDW Table (Average Marks per Student):")

print(pd.read_sql('SELECT * FROM Student_DW LIMIT 5', engine))

 

print("\nDW Table (Total Marks per Subject):")

print(pd.read_sql('SELECT * FROM Subject_DW', engine))


Step 8: Optional Visualization

import matplotlib.pyplot as plt

 

# Bar chart for average marks per student

plt.figure(figsize=(10,5))

plt.bar(dw_data['StudentID'], dw_data['Avg_Marks'], color='skyblue')

plt.xlabel('StudentID')

plt.ylabel('Average Marks')

plt.title('Average Marks per Student (DW)')

plt.show()


Step 9: Deliverables

Screenshots of Jupyter Notebook showing:

OLTP table (Student_OLTP)

DW tables (Student_DW & Subject_DW)

CSV files (optional):

oltp_data.to_csv('Student_OLTP.csv', index=False)

dw_data.to_csv('Student_DW.csv', index=False)

dw_subject.to_csv('Subject_DW.csv', index=False)

 

 

2.1

 

Draw Kimball Lifecycle using MS Excel / Lucidchart / Python (matplotlib).

Step 1: Open Jupyter Notebook

Launch Jupyter Notebook and create a new Python 3 notebook.


Step 2: Import Required Libraries

We will use matplotlib for drawing.

import matplotlib.pyplot as plt

from matplotlib.patches import FancyArrowPatch, Rectangle


Step 3: Define the Kimball Lifecycle Steps

Kimball Lifecycle typically includes these steps:

Business Requirements

Choose the Grain

Design the Dimension Tables

Design the Fact Tables

ETL Process

Deploy / Maintain Data Warehouse

We will visualize this as a flow diagram.

# Define lifecycle steps

steps = [

    "Business Requirements",

    "Choose the Grain",

    "Design Dimension Tables",

    "Design Fact Tables",

    "ETL Process",

    "Deploy / Maintain DW"

]

 

# Position of steps on X-axis

x_pos = [1, 2, 3, 4, 5, 6]

y_pos = [1]*6


Step 4: Create Diagram Using Matplotlib

# Create a figure

plt.figure(figsize=(14,3))

ax = plt.gca()

 

# Draw rectangles for each step

for x, step in zip(x_pos, steps):

    rect = Rectangle((x-0.5, 0.8), 1, 0.4, facecolor='skyblue', edgecolor='black')

    ax.add_patch(rect)

    plt.text(x, 1, step, ha='center', va='center', fontsize=10)

 

# Draw arrows between steps

for i in range(len(steps)-1):

    arrow = FancyArrowPatch((x_pos[i]+0.5, 1), (x_pos[i+1]-0.5, 1),

                            arrowstyle='->', mutation_scale=20, color='black')

    ax.add_patch(arrow)

 

# Set limits and remove axes

plt.xlim(0, 7)

plt.ylim(0, 2)

plt.axis('off')

plt.title("Kimball Lifecycle Diagram")

plt.show()

This will produce a flow diagram with rectangles for each step and arrows showing the process.


Step 5: Optional Excel Implementation

Open MS Excel → Insert → Shapes → Rectangles & Arrows.

Draw 6 rectangles horizontally.

Write each Kimball Lifecycle step in a rectangle.

Connect them with right arrows.

Save or export as an image for your report.


Step 6: Optional Lucidchart Implementation

Open Lucidchart → New Document → Flowchart Template.

Use rectangles for steps and arrows to show flow.

Label the steps as listed above.

Export as PNG/PDF to include in your assignment.


Deliverables

Python-generated diagram (matplotlib figure).

(Optional) Excel or Lucidchart version of the diagram.

Add a short note:

Kimball Lifecycle is an iterative process for building data warehouses.

Helps in designing dimensions, facts, and ETL process systematically.

 

 

2.2

Create a mock requirement document for a Retail Store Data Warehouse.



Step 1: Open Jupyter Notebook

Launch Jupyter Notebook → Create a New Python 3 notebook.


Step 2: Import Required Libraries

We will use pandas to create and save the mock requirement document.

import pandas as pd


Step 3: Define the Requirements

A mock requirement document typically contains:

Feature

Description

Priority

Notes

Sales Data

Daily sales transactions per store

High

Include transaction ID, date, product ID, quantity, amount

Customer Data

Customer information

Medium

Customer ID, name, gender, age, loyalty points

Product Data

Product master data

High

Product ID, name, category, price, supplier

Inventory Data

Stock levels

High

Store ID, product ID, current stock, reorder level

Employee Data

Store staff details

Low

Employee ID, name, role, store ID

Reports

Sales summary, product performance, customer analytics

High

Weekly, monthly reports for management decisions

We can create this in Python as a DataFrame.

# Create mock requirement document

requirements = pd.DataFrame({

    "Feature": [

        "Sales Data",

        "Customer Data",

        "Product Data",

        "Inventory Data",

        "Employee Data",

        "Reports"

    ],

    "Description": [

        "Daily sales transactions per store",

        "Customer information",

        "Product master data",

        "Stock levels",

        "Store staff details",

        "Sales summary, product performance, customer analytics"

    ],

    "Priority": [

        "High",

        "Medium",

        "High",

        "High",

        "Low",

        "High"

    ],

    "Notes": [

        "Include transaction ID, date, product ID, quantity, amount",

        "Customer ID, name, gender, age, loyalty points",

        "Product ID, name, category, price, supplier",

        "Store ID, product ID, current stock, reorder level",

        "Employee ID, name, role, store ID",

        "Weekly, monthly reports for management decisions"

    ]

})

 

# Display the DataFrame

requirements

Running this will show the mock requirement table in your notebook.


Step 4: Save the Requirement Document

You can save it as CSV or Excel.

# Save as CSV

requirements.to_csv("Retail_DW_Requirement.csv", index=False)

 

# Save as Excel

requirements.to_excel("Retail_DW_Requirement.xlsx", index=False)

Now you have a ready-to-submit requirement document.


Step 5: Optional Notes

This document can be expanded with data sources, ETL rules, fact and dimension tables, KPIs.

For teaching purposes, keep it simple and readable.


Deliverables

Displayed mock requirement table in Jupyter Notebook.

CSV file: Retail_DW_Requirement.csv.

Excel file: Retail_DW_Requirement.xlsx.

Optional short note explaining why each feature is needed in a Retail DW.

 

 

2.3

 

Identify roles and assign responsibilities in a team (example: project manager, ETL developer, analyst).

Step 1: Open Jupyter Notebook

Launch Jupyter Notebook → Create a New Python 3 notebook.


Step 2: Import Required Library

We will use pandas to create and display the team roles table.

import pandas as pd


Step 3: Define Team Roles and Responsibilities

A typical Data Warehouse team might have the following roles:

Role

Responsibility

Skills Required

Priority

Project Manager

Plan and manage the DW project, track progress, communicate with stakeholders

Project management, communication

High

Data Architect

Design the DW schema, select ETL tools, ensure data quality

Data modeling, SQL, ETL design

High

ETL Developer

Extract, Transform, Load data from sources to DW

SQL, Python, ETL tools, data cleansing

High

BI Analyst

Create reports, dashboards, and analytics for business users

SQL, Power BI/Tableau, Excel, analytics

Medium

Data Analyst

Analyze data, validate ETL results, support reporting

SQL, Python/R, Excel, statistics

Medium

DBA

Manage DW database, optimize performance, ensure backups

SQL, database tuning, security

High

QA/Tester

Test ETL jobs and reports for accuracy

SQL, testing tools, attention to detail

Medium

We can create this table in Python as a DataFrame.

# Create DW Team Roles and Responsibilities

team_roles = pd.DataFrame({

    "Role": [

        "Project Manager",

        "Data Architect",

        "ETL Developer",

        "BI Analyst",

        "Data Analyst",

        "DBA",

        "QA/Tester"

    ],

    "Responsibility": [

        "Plan and manage the DW project, track progress, communicate with stakeholders",

        "Design the DW schema, select ETL tools, ensure data quality",

        "Extract, Transform, Load data from sources to DW",

        "Create reports, dashboards, and analytics for business users",

        "Analyze data, validate ETL results, support reporting",

        "Manage DW database, optimize performance, ensure backups",

        "Test ETL jobs and reports for accuracy"

    ],

    "Skills_Required": [

        "Project management, communication",

        "Data modeling, SQL, ETL design",

        "SQL, Python, ETL tools, data cleansing",

        "SQL, Power BI/Tableau, Excel, analytics",

        "SQL, Python/R, Excel, statistics",

        "SQL, database tuning, security",

        "SQL, testing tools, attention to detail"

    ],

    "Priority": [

        "High",

        "High",

        "High",

        "Medium",

        "Medium",

        "High",

        "Medium"

    ]

})

 

# Display the DataFrame

team_roles

Running this will show the team roles table in your notebook.


Step 4: Save the Team Roles Document

You can save it as CSV or Excel for submission.

# Save as CSV

team_roles.to_csv("DW_Team_Roles.csv", index=False)

 

# Save as Excel

team_roles.to_excel("DW_Team_Roles.xlsx", index=False)


Step 5: Optional Notes

Add additional roles like Data Governance Lead, Metadata Manager, or Security Analyst if required.

Priority shows how critical the role is to the DW project.


Deliverables

Displayed team roles table in Jupyter Notebook.

CSV file: DW_Team_Roles.csv.

Excel file: DW_Team_Roles.xlsx.

Optional short note explaining why each role is important.

 

 

3.1

 

Create a Star Schema for Sales Data using SQL.

 

Correct Step-by-Step Solution

Step 1: Import Libraries

import pandas as pd

from sqlalchemy import create_engine, text


Step 2: Create Engine

engine = create_engine("sqlite:///sales_dw.db", echo=True)


Step 3: Execute SQL via a Connection

You now need to open a connection and use conn.execute(text("SQL QUERY")).

with engine.connect() as conn:

    # Product Dimension

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Dim_Product (

            ProductID INTEGER PRIMARY KEY,

            ProductName TEXT,

            Category TEXT,

            Price REAL

        )

    """))

 

    # Customer Dimension

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Dim_Customer (

            CustomerID INTEGER PRIMARY KEY,

            CustomerName TEXT,

            City TEXT,

            State TEXT

        )

    """))

 

    # Store Dimension

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Dim_Store (

            StoreID INTEGER PRIMARY KEY,

            StoreName TEXT,

            City TEXT,

            State TEXT

        )

    """))

 

    # Date Dimension

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Dim_Date (

            DateID INTEGER PRIMARY KEY,

            Date TEXT,

            Month INTEGER,

            Quarter INTEGER,

            Year INTEGER

        )

    """))

 

    # Fact Table

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Fact_Sales (

            SalesID INTEGER PRIMARY KEY,

            DateID INTEGER,

            ProductID INTEGER,

            CustomerID INTEGER,

            StoreID INTEGER,

            Quantity INTEGER,

            TotalAmount REAL,

            FOREIGN KEY(DateID) REFERENCES Dim_Date(DateID),

            FOREIGN KEY(ProductID) REFERENCES Dim_Product(ProductID),

            FOREIGN KEY(CustomerID) REFERENCES Dim_Customer(CustomerID),

            FOREIGN KEY(StoreID) REFERENCES Dim_Store(StoreID)

        )

    """))


Step 4: Insert Sample Data

with engine.connect() as conn:

    # Insert sample data into Product Dimension

    conn.execute(text("""

        INSERT INTO Dim_Product (ProductID, ProductName, Category, Price)

        VALUES

        (1, 'Laptop', 'Electronics', 500),

        (2, 'Phone', 'Electronics', 300),

        (3, 'Chair', 'Furniture', 50)

    """))

 

    # Insert sample data into Customer Dimension

    conn.execute(text("""

        INSERT INTO Dim_Customer (CustomerID, CustomerName, City, State)

        VALUES

        (1, 'Alice', 'Pune', 'MH'),

        (2, 'Bob', 'Mumbai', 'MH')

    """))

 

    # Insert sample data into Store Dimension

    conn.execute(text("""

        INSERT INTO Dim_Store (StoreID, StoreName, City, State)

        VALUES

        (1, 'Store A', 'Pune', 'MH'),

        (2, 'Store B', 'Mumbai', 'MH')

    """))

 

    # Insert sample data into Date Dimension

    conn.execute(text("""

        INSERT INTO Dim_Date (DateID, Date, Month, Quarter, Year)

        VALUES

        (1, '2025-09-01', 9, 3, 2025),

        (2, '2025-09-02', 9, 3, 2025)

    """))

 

    # Insert sample data into Fact_Sales

    conn.execute(text("""

        INSERT INTO Fact_Sales (SalesID, DateID, ProductID, CustomerID, StoreID, Quantity, TotalAmount)

        VALUES

        (1, 1, 1, 1, 1, 2, 1000),

        (2, 2, 2, 2, 2, 1, 300),

        (3, 1, 3, 1, 1, 4, 200)

    """))


Step 5: Query the Fact Table

with engine.connect() as conn:

    df = pd.read_sql("""

        SELECT f.SalesID, d.Date, p.ProductName, c.CustomerName, s.StoreName, f.Quantity, f.TotalAmount

        FROM Fact_Sales f

        JOIN Dim_Date d ON f.DateID = d.DateID

        JOIN Dim_Product p ON f.ProductID = p.ProductID

        JOIN Dim_Customer c ON f.CustomerID = c.CustomerID

        JOIN Dim_Store s ON f.StoreID = s.StoreID

    """, conn)

 

df

Now everything will work in SQLAlchemy 2.x without engine.execute() errors.

 

 

 

3.2

 

Create a Snowflake Schema for University Data

Step 1: Import Libraries

import pandas as pd

from sqlalchemy import create_engine, text


Step 2: Create Database Engine

# SQLite database (it will create 'university_dw.db' in your folder)

engine = create_engine("sqlite:///university_dw.db", echo=True)


Step 3: Create Dimension and Fact Tables (Snowflake Schema)

In a snowflake schema, dimensions are normalized, so for example, Dim_Student might link to Dim_Department.

with engine.connect() as conn:

   

    # Department Dimension

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Dim_Department (

            DeptID INTEGER PRIMARY KEY,

            DeptName TEXT

        )

    """))

 

    # Course Dimension

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Dim_Course (

            CourseID INTEGER PRIMARY KEY,

            CourseName TEXT,

            DeptID INTEGER,

            FOREIGN KEY(DeptID) REFERENCES Dim_Department(DeptID)

        )

    """))

 

    # Student Dimension

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Dim_Student (

            StudentID INTEGER PRIMARY KEY,

            StudentName TEXT,

            DeptID INTEGER,

            FOREIGN KEY(DeptID) REFERENCES Dim_Department(DeptID)

        )

    """))

 

    # Date Dimension

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Dim_Date (

            DateID INTEGER PRIMARY KEY,

            Date TEXT,

            Month INTEGER,

            Quarter INTEGER,

            Year INTEGER

        )

    """))

 

    # Fact Table (Grades / Enrollment)

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Fact_Enrollment (

            EnrollmentID INTEGER PRIMARY KEY,

            StudentID INTEGER,

            CourseID INTEGER,

            DateID INTEGER,

            Grade TEXT,

            FOREIGN KEY(StudentID) REFERENCES Dim_Student(StudentID),

            FOREIGN KEY(CourseID) REFERENCES Dim_Course(CourseID),

            FOREIGN KEY(DateID) REFERENCES Dim_Date(DateID)

        )

    """))


Step 4: Insert Sample Data

with engine.connect() as conn:

   

    # Departments

    conn.execute(text("""

        INSERT INTO Dim_Department (DeptID, DeptName) VALUES

        (1, 'Computer Science'),

        (2, 'Mathematics'),

        (3, 'Physics')

    """))

 

    # Courses

    conn.execute(text("""

        INSERT INTO Dim_Course (CourseID, CourseName, DeptID) VALUES

        (1, 'Data Structures', 1),

        (2, 'Algorithms', 1),

        (3, 'Calculus', 2),

        (4, 'Linear Algebra', 2),

        (5, 'Quantum Mechanics', 3)

    """))

 

    # Students

    conn.execute(text("""

        INSERT INTO Dim_Student (StudentID, StudentName, DeptID) VALUES

        (1, 'Alice', 1),

        (2, 'Bob', 2),

        (3, 'Charlie', 3)

    """))

 

    # Dates

    conn.execute(text("""

        INSERT INTO Dim_Date (DateID, Date, Month, Quarter, Year) VALUES

        (1, '2025-09-01', 9, 3, 2025),

        (2, '2025-09-02', 9, 3, 2025)

    """))

 

    # Fact Table

    conn.execute(text("""

        INSERT INTO Fact_Enrollment (EnrollmentID, StudentID, CourseID, DateID, Grade) VALUES

        (1, 1, 1, 1, 'A'),

        (2, 1, 2, 2, 'B'),

        (3, 2, 3, 1, 'A'),

        (4, 3, 5, 2, 'B')

    """))


Step 5: Query the Fact Table with Joins

with engine.connect() as conn:

    df = pd.read_sql("""

        SELECT f.EnrollmentID, s.StudentName, d.DeptName, c.CourseName, dt.Date, f.Grade

        FROM Fact_Enrollment f

        JOIN Dim_Student s ON f.StudentID = s.StudentID

        JOIN Dim_Course c ON f.CourseID = c.CourseID

        JOIN Dim_Department d ON s.DeptID = d.DeptID

        JOIN Dim_Date dt ON f.DateID = dt.DateID

    """, conn)

 

df

Output will show Enrollment details with student, department, course, date, and grade, which is a typical Snowflake Schema view.


 

3.3

 

Create a Fact Constellation Schema (Galaxy Schema) for Retail & Inventory.

Step 1: Import Libraries

import pandas as pd

from sqlalchemy import create_engine, text


Step 2: Create Database Engine

engine = create_engine("sqlite:///retail_inventory_dw.db", echo=True)


Step 3: Create Dimension Tables

with engine.connect() as conn:

 

    # Product Dimension

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Dim_Product (

            ProductID INTEGER PRIMARY KEY,

            ProductName TEXT,

            Category TEXT,

            Price REAL

        )

    """))

 

    # Store Dimension

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Dim_Store (

            StoreID INTEGER PRIMARY KEY,

            StoreName TEXT,

            Location TEXT

        )

    """))

 

    # Date Dimension

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Dim_Date (

            DateID INTEGER PRIMARY KEY,

            Date TEXT,

            Month INTEGER,

            Quarter INTEGER,

            Year INTEGER

        )

    """))


Step 4: Create Fact Tables

with engine.connect() as conn:

 

    # Fact Table: Sales

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Fact_Sales (

            SalesID INTEGER PRIMARY KEY,

            ProductID INTEGER,

            StoreID INTEGER,

            DateID INTEGER,

            QuantitySold INTEGER,

            TotalAmount REAL,

            FOREIGN KEY(ProductID) REFERENCES Dim_Product(ProductID),

            FOREIGN KEY(StoreID) REFERENCES Dim_Store(StoreID),

            FOREIGN KEY(DateID) REFERENCES Dim_Date(DateID)

        )

    """))

 

    # Fact Table: Inventory

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS Fact_Inventory (

            InventoryID INTEGER PRIMARY KEY,

            ProductID INTEGER,

            StoreID INTEGER,

            DateID INTEGER,

            StockIn INTEGER,

            StockOut INTEGER,

            FOREIGN KEY(ProductID) REFERENCES Dim_Product(ProductID),

            FOREIGN KEY(StoreID) REFERENCES Dim_Store(StoreID),

            FOREIGN KEY(DateID) REFERENCES Dim_Date(DateID)

        )

    """))


Step 5: Insert Sample Data

with engine.connect() as conn:

 

    # Products

    conn.execute(text("""

        INSERT INTO Dim_Product (ProductID, ProductName, Category, Price) VALUES

        (1, 'Laptop', 'Electronics', 50000),

        (2, 'Mobile', 'Electronics', 15000),

        (3, 'Shirt', 'Clothing', 800),

        (4, 'Jeans', 'Clothing', 1200)

    """))

 

    # Stores

    conn.execute(text("""

        INSERT INTO Dim_Store (StoreID, StoreName, Location) VALUES

        (1, 'Store A', 'Pune'),

        (2, 'Store B', 'Mumbai')

    """))

 

    # Dates

    conn.execute(text("""

        INSERT INTO Dim_Date (DateID, Date, Month, Quarter, Year) VALUES

        (1, '2025-09-01', 9, 3, 2025),

        (2, '2025-09-02', 9, 3, 2025)

    """))

 

    # Fact_Sales

    conn.execute(text("""

        INSERT INTO Fact_Sales (SalesID, ProductID, StoreID, DateID, QuantitySold, TotalAmount) VALUES

        (1, 1, 1, 1, 5, 250000),

        (2, 2, 1, 2, 10, 150000),

        (3, 3, 2, 1, 20, 16000)

    """))

 

    # Fact_Inventory

    conn.execute(text("""

        INSERT INTO Fact_Inventory (InventoryID, ProductID, StoreID, DateID, StockIn, StockOut) VALUES

        (1, 1, 1, 1, 10, 5),

        (2, 2, 1, 2, 20, 10),

        (3, 3, 2, 1, 30, 20)

    """))


Step 6: Query Fact Tables with Joins

with engine.connect() as conn:

    df_sales = pd.read_sql("""

        SELECT f.SalesID, p.ProductName, s.StoreName, d.Date, f.QuantitySold, f.TotalAmount

        FROM Fact_Sales f

        JOIN Dim_Product p ON f.ProductID = p.ProductID

        JOIN Dim_Store s ON f.StoreID = s.StoreID

        JOIN Dim_Date d ON f.DateID = d.DateID

    """, conn)

 

    df_inventory = pd.read_sql("""

        SELECT f.InventoryID, p.ProductName, s.StoreName, d.Date, f.StockIn, f.StockOut

        FROM Fact_Inventory f

        JOIN Dim_Product p ON f.ProductID = p.ProductID

        JOIN Dim_Store s ON f.StoreID = s.StoreID

        JOIN Dim_Date d ON f.DateID = d.DateID

    """, conn)

 

print("Sales Fact Table")

print(df_sales)

print("\nInventory Fact Table")

print(df_inventory)


Explanation:

Fact_Sales and Fact_Inventory are two separate fact tables.

Dim_Product, Dim_Store, Dim_Date are shared dimension tables.

This structure represents a Galaxy Schema because multiple fact tables share common dimensions.

 

 

4.1

Write a Python program using Pandas to perform ETL (read CSV → clean data → load into DB).

 

 

pip install pandas sqlalchemy sqlite3

 

 

import pandas as pd

from io import StringIO

 

# -------------------

# 1. Extract (Read CSV)

# -------------------

csv_data = """id,first_name,last_name,age,join_date,salary

1,Alice,Smith,34,2021/01/05,50000

2,Bob,Jones,,05-02-2021,60000

3,carol,"  O'Neil",29,2021.03.10,not available

3,carol,"  O'Neil",29,2021-03-10,70000

4,,Khan,150,2021-13-01,80000

5,Eve,Stone,27,March 2 2021,65000

"""

 

# Read CSV into DataFrame

df = pd.read_csv(StringIO(csv_data))

 

print("🔹 Raw Data:")

print(df, "\n")

 

# -------------------

# 2. Transform (Clean Data)

# -------------------

 

# Strip whitespace & fix capitalization

df['first_name'] = df['first_name'].str.strip().str.capitalize()

df['last_name'] = df['last_name'].str.strip()

 

# Fill missing first names

df['first_name'] = df['first_name'].fillna("Unknown")

 

# Fix ages (invalid -> NaN)

df['age'] = pd.to_numeric(df['age'], errors='coerce')

df.loc[(df['age'] < 0) | (df['age'] > 100), 'age'] = None

 

# Standardize join_date

df['join_date'] = pd.to_datetime(df['join_date'], errors='coerce')

 

# Fix salary (convert to numeric, invalid -> NaN)

df['salary'] = pd.to_numeric(df['salary'], errors='coerce')

 

# Remove duplicates

df = df.drop_duplicates()

 

# -------------------

# 3. Load (Save Cleaned Data)

# -------------------

df.to_csv("cleaned_data.csv", index=False)

 

print(" Cleaned Data:")

print(df)

 

import pandas as pd

from sqlalchemy import create_engine

 

# Step 1: Extract - Read CSV using full path

csv_path = r"C:\Users\Nikhil\Downloads\sample_data.csv"

db_path = r"sqlite:///C:\Users\Nikhil\Downloads\etl_example.db"

 

df = pd.read_csv(csv_path)

print("Original Data:")

print(df)

 

# Step 2: Transform - Clean data

 

# Strip spaces

df = df.applymap(lambda x: x.strip() if isinstance(x, str) else x)

 

# Convert numeric

df['age'] = pd.to_numeric(df['age'], errors='coerce')

df['salary'] = pd.to_numeric(df['salary'], errors='coerce')

 

# Convert dates

df['join_date'] = pd.to_datetime(df['join_date'], errors='coerce')

 

# Deduplicate (keep last row for same id)

df = df.drop_duplicates(subset=['id'], keep='last')

 

# Handle invalid ages

df.loc[(df['age'] < 0) | (df['age'] > 120), 'age'] = None

 

# Fill missing values

df['age'] = df['age'].fillna(df['age'].median())

df['salary'] = df['salary'].fillna(df['salary'].median())

df['first_name'] = df['first_name'].fillna("Unknown")

 

# Create derived column

df['full_name'] = df['first_name'] + " " + df['last_name']

 

print("\nCleaned Data:")

print(df)

 

# Step 3: Load - Save into SQLite using full path

engine = create_engine(db_path)

df.to_sql("people", con=engine, if_exists="replace", index=False)

 

# Verify

loaded = pd.read_sql("SELECT * FROM people", engine)

print("\nData Loaded from DB:")

print(loaded)

cd C:\Users\Nikhil\Downloads

 

pip install psycopg2-binary

 

conda install psycopg2

import sqlite3

conn = sqlite3.connect("etl_example.db")

rows = conn.execute("SELECT * FROM people").fetchall()

print(rows)

conn.close()

 

4.2

 

Use SQL to perform data aggregation (ROLAP style queries). Example: ROLLUP, CUBE.

 

from sqlalchemy import create_engine, text

 

# connect to SQLite

engine = create_engine(r"sqlite:///C:\Users\Nikhil\Downloads\etl_example.db")

 

# create table with SQL

with engine.connect() as conn:

    conn.execute(text("""

        CREATE TABLE IF NOT EXISTS sales (

            id INTEGER PRIMARY KEY,

            product TEXT,

            quantity INTEGER,

            price REAL

        )

    """))

    conn.commit()

 

print(" Table 'sales' created successfully!")

 

 

 

 

import pandas as pd

from sqlalchemy import create_engine, text

 

# Connect to SQLite

engine = create_engine("sqlite:///mydb.sqlite")

 

# Step 1: Create 'sales' table (only if not exists)

with engine.connect() as conn:

    conn.execute(text("""

    CREATE TABLE IF NOT EXISTS sales (

        id INTEGER PRIMARY KEY,

        product TEXT,

        year INTEGER,

        amount REAL,

        state TEXT

    );

    """))

 

# Step 2: Insert some sample rows (only run once, or wrap in try/except)

data = [

    (1, "Laptop", 2021, 1200.50, "Maharashtra"),

    (2, "Phone", 2021, 800.00, "Maharashtra"),

    (3, "Tablet", 2022, 600.00, "Gujarat"),

    (4, "Laptop", 2022, 1400.00, "Gujarat"),

    (5, "Phone", 2023, 900.00, "Maharashtra"),

]

df = pd.DataFrame(data, columns=["id", "product", "year", "amount", "state"])

df.to_sql("sales", engine, if_exists="append", index=False)

 

# Step 3: Run aggregation query

query = """

SELECT state, year, SUM(amount) AS total_sales

FROM sales

GROUP BY state, year;

"""

 

with engine.connect() as conn:

    result = pd.read_sql(text(query), conn)

 

print("\n Aggregated Sales Data:")

print(result)

 

import pandas as pd

from sqlalchemy import create_engine

 

# Connect to SQLite (creates mydb.sqlite if not exists)

engine = create_engine("sqlite:///mydb.sqlite")

 

# Create sample sales data

data = [

    ("North", 2021, 1000),

    ("North", 2022, 1500),

    ("South", 2021, 2000),

    ("South", 2022, 2500),

    ("East", 2021, 1200),

    ("East", 2022, 1700),

]

 

df_sales = pd.DataFrame(data, columns=["region", "year", "amount"])

 

# Correct use of 'with'

with engine.begin() as conn:

    df_sales.to_sql("sales", conn, if_exists="replace", index=False)

 

print(" Sales table created successfully!")


from sqlalchemy import text

 

query = text("""

-- Region + Year totals

SELECT region, year, SUM(amount) AS total_sales

FROM sales

GROUP BY region, year

 

UNION ALL

 

-- Region totals only

SELECT region, NULL AS year, SUM(amount) AS total_sales

FROM sales

GROUP BY region

 

UNION ALL

 

-- Year totals only

SELECT NULL AS region, year, SUM(amount) AS total_sales

FROM sales

GROUP BY year

 

UNION ALL

 

-- Grand total

SELECT NULL AS region, NULL AS year, SUM(amount) AS total_sales

FROM sales;

""")

 

with engine.connect() as conn:

    result = pd.read_sql(query, conn)

 

print(result)

 


Post a Comment

0 Comments