Analyzing AdvWorks Orders data with Python & ML
I am using the AdvWorks prod_orders.csv dataset for this analysis and demonstrating multiple ways Python can be used within a notebook to generate useful analyses. Below I’ve done:
– Sales Over Time
– Top Selling Products
– Top Customers
– Fulfillment Analysis
– Customer Lifetime Value (CLV)
– Sales Forecasting
– Market Basket Analysis
The schema for the dataset is below:
|– SalesOrderID: integer (nullable = true)
|– SalesOrderDetailID: integer (nullable = true)
|– OrderDate: timestamp (nullable = true)
|– DueDate: timestamp (nullable = true)
|– ShipDate: timestamp (nullable = true)
|– EmployeeID: integer (nullable = true)
|– CustomerID: integer (nullable = true)
|– SubTotal: float (nullable = true)
|– TaxAmt: float (nullable = true)
|– Freight: float (nullable = true)
|– TotalDue: float (nullable = true)
|– ProductID: integer (nullable = true)
|– OrderQty: integer (nullable = true)
|– UnitPrice: float (nullable = true)
|– UnitPriceDiscount: float (nullable = true)
|– LineTotal: float (nullable = true)
1. Sales over time
This code groups the data by year and month, then sums LineTotal for each period. The output is pasted below in the same codeblock
from pyspark.sql.functions import year, month
sales_over_time = df.groupBy(year("OrderDate").alias("Year"), month("OrderDate").alias("Month")).sum("LineTotal").orderBy("Year", "Month")
sales_over_time.show()
+----+-----+------------------+
|Year|Month| sum(LineTotal)|
+----+-----+------------------+
|2011| 5| 489328.5775051117|
|2011| 7|1538408.3072342873|
|2011| 8|2010618.0655760765|
|2011| 10|4027080.3193511963|
|2011| 12| 713116.6905941963|
|2012| 1| 3356069.332834244|
|2012| 2| 882899.94196558|
|2012| 3|2269116.7013754845|
|2012| 4|1001803.7586708069|
|2012| 5|2393689.5104427338|
|2012| 6| 3601190.696228981|
|2012| 7|2885359.1877355576|
|2012| 8|1802154.2098355293|
|2012| 9| 3053816.318526268|
|2012| 10| 2185213.212389946|
|2012| 11|1317541.8308796883|
|2012| 12| 2384846.583500862|
|2013| 1|1563955.0806398392|
2. Top Selling Products
This groups the data by ProductID (which could then be joined to a product dimension) and sums the OrderQTY for each product, then sorts the results to show the top 10 selling products.
top_products = df.groupBy("ProductID").sum("OrderQty").orderBy("sum(OrderQty)", ascending=False)
top_products.show(10)
+---------+-------------+
|ProductID|sum(OrderQty)|
+---------+-------------+
| 715| 6140|
| 712| 6121|
| 711| 4618|
| 708| 4447|
| 864| 4079|
| 707| 4036|
| 884| 3455|
| 863| 3378|
| 714| 3194|
| 867| 2992|
+---------+-------------+
3. Customer Segmentation
This groups the data by CustomerID and sums LineTotal for each customer and then sorts to show the top 10 customers by sales
customer_spending = df.groupBy("CustomerID").sum("LineTotal").orderBy("sum(LineTotal)", ascending=False)
customer_spending.show(10)
+----------+-----------------+
|CustomerID| sum(LineTotal)|
+----------+-----------------+
| 1031|877107.1929020882|
| 813|853849.1766397953|
| 827|841908.7684364319|
| 1991|816755.5729646683|
| 591|799277.8888859749|
| 647|787773.0409326553|
| 785|746317.5246133804|
| 599|740985.8302721977|
| 1417|730798.7124381065|
| 661|727272.6451358795|
+----------+-----------------+
4. Fulfillment Analysis
This calculates the number of days between OrderDate and ShipDate for each order and provides summary statistics for the fulfillment time. The mean fulfillment time is 7 days and the standard deviation is only 0.021, which shows fulfillment times are very consistent at about 1 week from order to shipping.
from pyspark.sql.functions import datediff
df = df.withColumn("FulfillmentTime", datediff("ShipDate", "OrderDate"))
fulfillment_stats = df.select("FulfillmentTime").describe()
fulfillment_stats.show()
+-------+--------------------+
|summary| FulfillmentTime|
+-------+--------------------+
| count| 60919|
| mean| 7.000459626717444|
| stddev|0.021434155039653294|
| min| 7|
| max| 8|
+-------+--------------------+
5. Customer Lifetime Value (CLV)
This calculates the average value of each customer’s orders and multiplies it by the number of orders to estimate the CLV
from pyspark.sql.functions import sum as _sum, count, col
customer_lifetime_value = df.groupBy("CustomerID").agg(
_sum("LineTotal").alias("TotalSpent"),
count("SalesOrderID").alias("OrderCount")
).withColumn("CLV", col("TotalSpent") / col("OrderCount"))
customer_lifetime_value.show(10)
+----------+------------------+----------+------------------+
|CustomerID| TotalSpent|OrderCount| CLV|
+----------+------------------+----------+------------------+
| 833|153764.35362493992| 224| 686.4480072541961|
| 1829| 320221.5476779938| 289|1108.0330369480753|
| 471| 443316.1137523651| 375|1182.1763033396403|
| 463|197801.05562782288| 154| 1284.422439141707|
| 1959| 6278.421112060547| 9| 697.6023457845052|
| 623| 145380.1288986206| 131| 1109.771976325348|
| 1025| 86177.8471031189| 85|1013.8570247425753|
| 1395|20417.259766578674| 42|486.12523253758746|
| 737| 61809.13270378113| 38| 1626.556123783714|
| 1483| 59.33399963378906| 2| 29.66699981689453|
+----------+------------------+----------+------------------+
6. Predictive Modelling for Sales Forecasting
A VectorAssembler combines OrderQty, UnitPrice and UnitPriceDiscount into a single feature vector, and then is selected to include only the feature vector and LineTotal. Then I’ve split the data into training (80%) and testing (20%) sets to evaluate its performance. The model gets evaluated on the test data and outputs the RMSE (root mean squared error) which measures the average magnitude of errors between predicted and actual values, and R2 which indicates the percentage of the variance which is explained by the model. Below, 65% (0.65) of the variance in LineTotal is explained by the model.
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
# Prepare the data
assembler = VectorAssembler(inputCols=["OrderQty", "UnitPrice", "UnitPriceDiscount"], outputCol="features")
data = assembler.transform(df).select("features", "LineTotal")
# Split the data
train_data, test_data = data.randomSplit([0.8, 0.2])
# Train the model
lr = LinearRegression(labelCol="LineTotal")
lr_model = lr.fit(train_data)
# Evaluate the model
test_results = lr_model.evaluate(test_data)
print(f"RMSE: {test_results.rootMeanSquaredError}")
print(f"R2: {test_results.r2}")
Output:
RMSE: 1248.7924101938995
R2: 0.6517121907122512
7. Market Basket Analysis
The transactions are grouped by SalesOrderID and the ProductIDs are collected into a list for each transaction. Then, an FP-Growth model is trained on the transactions with a minimum support of 0.01 and a minimum confidence of 0.5. The output shows frequent itemsets found in the transactions, together with their frequencies. Below, we can see 993 appears separately 105 times, and 993 and 937 appear together 74 times and so on. This is useful for understanding items that are frequently purchased together (or services that are provided together) and the results can be used to aid cross-selling strategies.
from pyspark.sql.functions import collect_list
from pyspark.ml.fpm import FPGrowth
# Data prep
transactions = df.groupBy("SalesOrderID").agg(collect_list("ProductID").alias("items"))
# Model training
fp_growth = FPGrowth(itemsCol="items", minSupport=0.01, minConfidence=0.5)
model = fp_growth.fit(transactions)
# Frequent ItemSets
model.freqItemsets.show()
# Association Rules
model.associationRules.show()
Output:
+--------------------+----+
| items|freq|
+--------------------+----+
| [993]| 105|
| [993, 937]| 74|
| [993, 937, 743]| 45|
|[993, 937, 743, 748]| 39|
| [993, 937, 869]| 61|
|[993, 937, 869, 784]| 45|
|[993, 937, 869, 7...| 42|
|[993, 937, 869, 867]| 49|
|[993, 937, 869, 8...| 42|
|[993, 937, 869, 8...| 41|
|[993, 937, 869, 8...| 40|
|[993, 937, 869, 8...| 40|
|[993, 937, 869, 781]| 44|
|[993, 937, 869, 779]| 51|
|[993, 937, 869, 7...| 44|
|[993, 937, 869, 7...| 39|
|[993, 937, 869, 7...| 39|
|[993, 937, 869, 809]| 49|
|[993, 937, 869, 748]| 40|
|[993, 937, 869, 783]| 44|
+--------------------+----+
Leave a Reply