Skip to main content

Unit testing for notebooks

You can use unit testing to help improve the quality and consistency of your notebooks’ code. Unit testing is an approach to testing self-contained units of code, such as functions, early and often. This helps you find problems with your code faster, uncover mistaken assumptions about your code sooner, and streamline your overall coding efforts.

This article is an introduction to basic unit testing with functions. Advanced concepts such as unit testing classes and interfaces, as well as the use of stubs, mocks, and test harnesses, while also supported when unit testing for notebooks, are outside the scope of this article. This article also does not cover other kinds of testing methods, such as integration testing, system testing, acceptance testing, or non-functional testing methods such as performance testing or usability testing.

This article demonstrates the following:

  • How to organize functions and their unit tests.
  • How to write functions in Python, R, Scala, as well as user-defined functions in SQL, that are well-designed to be unit tested.
  • How to call these functions from Python, R, Scala, and SQL notebooks.
  • How to write unit tests in Python, R, and Scala by using the popular test frameworks pytest for Python, testthat for R, and ScalaTest for Scala. Also how to write SQL that unit tests SQL user-defined functions (SQL UDFs).
  • How to run these unit tests from Python, R, Scala, and SQL notebooks.
note

Databricks recommends writing and running your unit tests in a notebook. While you can run some commands in the web terminal, the web terminal has more limitations, such as a lack of support for Spark. See Run shell commands in Databricks web terminal.

Organize functions and unit tests

There are a few common approaches for organizing your functions and their unit tests with notebooks. Each approach has its benefits and challenges.

For Python, R, and Scala notebooks, common approaches include the following:

  • Store functions and their unit tests outside of notebooks..
    • Benefits: You can call these functions with and outside of notebooks. Test frameworks are better designed to run tests outside of notebooks.
    • Challenges: This approach is not supported for Scala notebooks. This approach also increases the number of files to track and maintain.
  • Store functions in one notebook and their unit tests in a separate notebook..
    • Benefits: These functions are easier to reuse across notebooks.
    • Challenges: The number of notebooks to track and maintain increases. These functions cannot be used outside of notebooks. These functions can also be more difficult to test outside of notebooks.
  • Store functions and their unit tests within the same notebook..
    • Benefits: Functions and their unit tests are stored within a single notebook for easier tracking and maintenance.
    • Challenges: These functions can be more difficult to reuse across notebooks. These functions cannot be used outside of notebooks. These functions can also be more difficult to test outside of notebooks.

For Python and R notebooks, Databricks recommends storing functions and their unit tests outside of notebooks. For Scala notebooks, Databricks recommends including functions in one notebook and their unit tests in a separate notebook.

For SQL notebooks, Databricks recommends that you store functions as SQL user-defined functions (SQL UDFs) in your schemas (also known as databases). You can then call these SQL UDFs and their unit tests from SQL notebooks.

Write functions

This section describes a simple set of example functions that determine the following:

  • Whether a table exists in a database.
  • Whether a column exists in a table.
  • How many rows exist in a column for a value within that column.

These functions are intended to be simple, so that you can focus on the unit testing details in this article rather than focus on the functions themselves.

To get the best unit testing results, a function should return a single predictable outcome and be of a single data type. For example, to check whether something exists, the function should return a boolean value of true or false. To return the number of rows that exist, the function should return a non-negative, whole number. It should not, in the first example, return either false if something does not exist or the thing itself if it does exist. Likewise, for the second example, it should not return either the number of rows that exist or false if no rows exist.

You can add these functions to an existing Databricks workspace as follows, in Python, R, Scala, or SQL.

The following code assumes you have Set up Databricks Git folders (Repos), added a repo, and have the repo open in your Databricks workspace.

Create a file named myfunctions.py within the repo, and add the following contents to the file. Other examples in this article expect this file to be named myfunctions.py. You can use different names for your own files.

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
.appName('integrity-tests') \
.getOrCreate()

# Does the specified table exist in the specified database?
def tableExists(tableName, dbName):
return spark.catalog.tableExists(f"{dbName}.{tableName}")

# Does the specified column exist in the given DataFrame?
def columnExists(dataFrame, columnName):
if columnName in dataFrame.columns:
return True
else:
return False

# How many rows are there for the specified value in the specified column
# in the given DataFrame?
def numRowsInColumnForValue(dataFrame, columnName, columnValue):
df = dataFrame.filter(col(columnName) == columnValue)

return df.count()

Call functions

This section describes code that calls the preceding functions. You could use these functions, for example, to count the number of rows in table where a specified value exists within a specfied column. However, you would want to check whether the table actually exists, and whether the column actually exists in that table, before you proceed. The following code checks for these conditions.

If you added the functions from the preceding section to your Databricks workspace, you can call these functions from your workspace as follows.

Create a Python notebook in the same folder as the preceding myfunctions.py file in your repo, and add the following contents to the notebook. Change the variable values for the table name, the schema (database) name, the column name, and the column value as needed. Then attach the notebook to a cluster and run the notebook to see the results.

from myfunctions import *

tableName = "diamonds"
dbName = "default"
columnName = "clarity"
columnValue = "VVS2"

# If the table exists in the specified database...
if tableExists(tableName, dbName):

df = spark.sql(f"SELECT * FROM {dbName}.{tableName}")

# And the specified column exists in that table...
if columnExists(df, columnName):
# Then report the number of rows for the specified value in that column.
numRows = numRowsInColumnForValue(df, columnName, columnValue)

print(f"There are {numRows} rows in '{tableName}' where '{columnName}' equals '{columnValue}'.")
else:
print(f"Column '{columnName}' does not exist in table '{tableName}' in schema (database) '{dbName}'.")
else:
print(f"Table '{tableName}' does not exist in schema (database) '{dbName}'.")

Write unit tests

This section describes code that tests each of the functions that are described toward the beginning of this article. If you make any changes to functions in the future, you can use unit tests to determine whether those functions still work as you expect them to.

If you added the functions toward the beginning of this article to your Databricks workspace, you can add unit tests for these functions to your workspace as follows.

Create another file named test_myfunctions.py in the same folder as the preceding myfunctions.py file in your repo, and add the following contents to the file. By default, pytest looks for .py files whose names start with test_ (or end with _test) to test. Similarly, by default, pytest looks inside of these files for functions whose names start with test_ to test.

In general, it is a best practice to not run unit tests against functions that work with data in production. This is especially important for functions that add, remove, or otherwise change data. To protect your production data from being compromised by your unit tests in unexpected ways, you should run unit tests against non-production data. One common approach is to create fake data that is as close as possible to the production data. The following code example creates fake data for the unit tests to run against.

import pytest
import pyspark
from myfunctions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType

tableName = "diamonds"
dbName = "default"
columnName = "clarity"
columnValue = "SI2"

# Because this file is not a Databricks notebook, you
# must create a Spark session. Databricks notebooks
# create a Spark session for you by default.
spark = SparkSession.builder \
.appName('integrity-tests') \
.getOrCreate()

# Create fake data for the unit tests to run against.
# In general, it is a best practice to not run unit tests
# against functions that work with data in production.
schema = StructType([ \
StructField("_c0", IntegerType(), True), \
StructField("carat", FloatType(), True), \
StructField("cut", StringType(), True), \
StructField("color", StringType(), True), \
StructField("clarity", StringType(), True), \
StructField("depth", FloatType(), True), \
StructField("table", IntegerType(), True), \
StructField("price", IntegerType(), True), \
StructField("x", FloatType(), True), \
StructField("y", FloatType(), True), \
StructField("z", FloatType(), True), \
])

data = [ (1, 0.23, "Ideal", "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43 ), \
(2, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31 ) ]

df = spark.createDataFrame(data, schema)

# Does the table exist?
def test_tableExists():
assert tableExists(tableName, dbName) is True

# Does the column exist?
def test_columnExists():
assert columnExists(df, columnName) is True

# Is there at least one row for the value in the specified column?
def test_numRowsInColumnForValue():
assert numRowsInColumnForValue(df, columnName, columnValue) > 0

Run unit tests

This section describes how to run the unit tests that you coded in the preceding section. When you run the unit tests, you get results showing which unit tests passed and failed.

If you added the unit tests from the preceding section to your Databricks workspace, you can run these unit tests from your workspace. You can run these unit tests either manually or on a schedule.

Create a Python notebook in the same folder as the preceding test_myfunctions.py file in your repo, and add the following contents.

In the new notebook’s first cell, add the following code, and then run the cell, which calls the %pip magic. This magic installs pytest.

%pip install pytest

In the second cell, add the following code and then run the cell. Results show which unit tests passed and failed.

import pytest
import sys

# Skip writing pyc files on a readonly filesystem.
sys.dont_write_bytecode = True

# Run pytest.
retcode = pytest.main([".", "-v", "-p", "no:cacheprovider"])

# Fail the cell execution if there are any test failures.
assert retcode == 0, "The pytest invocation failed. See the log for details."
tip

You can view the results of your notebook runs (including unit test results) in your cluster’s driver logs. You can also specify a location for your cluster’s log delivery.

You can set up a continuous integration and continuous delivery or deployment (CI/CD) system, such as GitHub Actions, to automatically run your unit tests whenever your code changes. For an example, see the coverage of GitHub Actions in Software engineering best practices for notebooks.

Additional resources

pytest

testthat

ScalaTest

SQL