Write your data into Lakehouse by Notebooks

Since Lakehouse is one of the key items within Microsoft Fabric, it is important to know how to write data into it in various formats and using different tools. One of the most common tools is notebooks, as they provide great flexibility and speed for development and testing with graphical outputs. In this article, I want to focus primarily on the following types of notebooks:

  • PySpark
  • Python

Both of these types can write data into Lakehouse items (with or without schema support), but each does it slightly differently and supports different approaches to achieve the goal. However, what they have in common is that both can have a default Lakehouse set, and all operations are performed against it by default. From a development perspective, it is good to have the ability to set the target Lakehouse dynamically, not just rely on one default, to easily switch between different environments and, in specific scenarios, write to multiple Lakehouses simultaneously.

This can be achieved using APIs, configuration, or referencing individual Lakehouses by their indexes in the environment. Before we dive into writing data, let’s look at how we can work with Lakehouse in Fabric and how we can easily switch between different Lakehouses.

Default Lakehouses in Notebook

In Fabric, it is possible to have only one default Lakehouse set within a notebook. This default Lakehouse is used for many operations as the primary calling element because it serves as the source Hive metastore. An example of this can be reading data:

# Read data from default Lakehouse - Table Customers
## loads a table from the default catalog and database configured in Spark
df = spark.read.table('Customers')
## explicitly loads a Delta table from a given file path (for us it is the default Lakehouse)
df = spark.read.format("delta").load('Tables/Customers')
## executes a SQL query on a table stored in a specific database/schema (without specifying the database, the default one is used)
df = spark.sql("SELECT * FROM Customers")

As you can see, there are quite a few ways to read data (these are not all, just examples). The undeniable advantage of default Lakehouses is that we never had to specify the path/database or anything else. Once you set a Lakehouse as default, everything is automatically performed against it, which simplifies work, shortens code, and increases readability. If all executed notebooks were pinned to the same Lakehouse with the same environment, we could benefit from High Concurrencies (as mentioned in Orchestration Fabric Notebooks).

On the other hand, if I am creating a solution that I want to transfer between environments (without using Deployment Pipelines), customers, scenarios, or even just sometimes migrate a Lakehouse, it is good to have the ability to dynamically change the target Lakehouse without having to open each notebook and manually reset the default Lakehouse. Because modifying, for example, 10 notebooks is repetitive and boring but with a low probability of error. If I have more notebooks, say 50, the risk of error increases, and it becomes very inefficient time-wise.

Switching Lakehouse by Configuration

Each notebook contains configuration metadata that indicates which Environment, Lakehouse the notebook should connect to upon session initialization. If the notebook contains the magic command ‘%%configure’ in the FIRST cell, these configuration metadata can be overwritten during session initialization.

%%configure -f
{ 
    "defaultLakehouse": {
        "name": '<name of the lakehouse>',
        "id": '<lakehouse id>',
        "workspaceId": '<workspace id of the lakehouse>'
    }
}

This variant is very simple and readable, but it is also very static and does not allow dynamic switching of Lakehouse within a single notebook during a single run. Since it is the first cell, dynamic switching options, such as passing parameters from outside (e.g., from orchestration using notebookutils.notebook.runMultiple), are very limited.

Switching Lakehouse by notebookutils

Another option is to use the mentioned notebookutils library. It allows performing many operations necessary for notebook development and orchestration. One of them is the ability to dynamically change the default Lakehouse of notebooks. This function is very simple, as it requires only a few basic parameters, and everything else is handled within the library.

# Update the definition of a Notebook
notebookutils.notebook.updateDefinition(
    name = "", # Notebook id or name
    workspaceId = "", # Workspace ID of notebook
    defaultLakehouse = "", # ID or name of new default Lakehouse
    defaultLakehouseWorkspace = "" # Workspace ID of new default Lakehouse
)

This function can be used in any cell and utilized within a loop to update all notebooks in a single run. This way, it is possible to easily and efficiently switch between different Lakehouses, increasing flexibility and development and testing possibilities. However, this still does not solve the problem of needing to write to multiple Lakehouses simultaneously or direct parameterization within a single run. It still holds that I must first make changes to all notebooks and only then can I start doing anything else.

Writing data into Lakehouse by PySpark

As mentioned above, writing data into Lakehouse can be done in multiple ways, just like reading. If I stick to the default Lakehouse for a moment, writing data into a Delta Table can be done as follows:

# Write data to default Lakehouse - Table Customers
## append or overwrites a table in the default catalog and database configured in Spark
df.write.mode("append").saveAsTable('Customers')
df.write.mode("overwrite").saveAsTable('Customers')
## append or overwrites the content of the DataFrame to a Delta table in the default catalog and database configured in Spark
df.write.format("delta").mode("append").save('Tables/Customers')
df.write.format("delta").mode("overwrite").save('Tables/Customers')
## executes an SQL query to write the content of the DataFrame to a table stored in a specific database/schema
df.write.insertInto("Customers")

However, if I want to write to different Lakehouses or have higher parameterization support, I need to use a slightly different approach. Instead of referring to the default Lakehouse, I refer directly to the specific one I want to use. Since “save” supports saving using ABFSS path, this is very simple. It is good to know that within Microsoft Fabric, you can encounter two ABFSS path formats, both interchangeable and functioning the same:

  • abfss://{workspace id}@onelake.dfs.fabric.microsoft.com/{lakehouse id}/Tables/{db schema}/{table name}
  • abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/{lakehouse_name}.Lakehouse/Files/{folder_name}/{file_name}.{file_extension}

The entire code can look as follows:

# Write data to specific Lakehouse - Table Customers
lakehouse_id = '<lakehouse id>'
lakehouse_workspace_id = '<workspace id of the lakehouse>'
db_schema = '<database schema>' # Needs to be specified only for Lakehouses with schema support
table_name = '<table name>'
abfss_path = f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{db_schema}/{table_name}"

## append or overwrites a table in specific Lakehouse
df.write.mode("append").format("delta").save(abfss_path)
df.write.mode("overwrite").format("delta").save(abfss_path)

This is a basic way to write to a Lakehouse outside the current workspace and not the default Lakehouse. Since this does not rely on direct notebook metadata, it is possible to parameterize this operation and connect to various IDs at runtime. However, there are more options…

For example, if we are talking about a Lakehouse within the same workspace, it is possible to use simpler methods like saveAsTable, where you can specify the Lakehouse name and table, or even schema.

lakehouse_name = '<lakehouse name>'
table_name = '<table name>'

df.write.mode("append").saveAsTable(f"{lakehouse_name}.{table_name}")
df.write.mode("overwrite").saveAsTable(f"{lakehouse_name}.{table_name}")

schema = '<db schema>'
df.write.mode("append").saveAsTable(f"{lakehouse_name}.{schema}.{table_name}")
df.write.mode("overwrite").saveAsTable(f"{lakehouse_name}.{schema}.{table_name}")

Of course, you can also use other packages that can help with writing. For example, delta.tables, which allows performing various operations on Delta Tables, such as writing, reading, updating values, deleting, optimizing, backing up, and much more. Since it is a Python package, it can be used within notebooks, simplifying and shortening the code.

from delta.tables import DeltaTable

lakehouse_id = '<lakehouse id>'
lakehouse_workspace_id = '<workspace id of the lakehouse>'
db_schema = '<database schema>' # Needs to be specified only for Lakehouses with schema support
table_name = '<table name>'
path = f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{db_schema}/{table_name}"

deltaTable = DeltaTable.forPath(spark, path)
deltaTable.alias("df").merge(df.alias("s"), "df.id = s.id" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

For writing data into Files, it works similarly, just instead of referring to “Tables,” you refer to “Files,” and instead of a table, you write directly to a file. This way, you can write data to a Lakehouse outside the workspace or within the workspace itself. The format of writing, the level of nesting, and other parameters can vary according to needs and requirements.

# Write data to specific Lakehouse - Table Customers
lakehouse_id = '<lakehouse id>'
lakehouse_workspace_id = '<workspace id of the lakehouse>'
folder_name = '<folder name>'
abfss_path = f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/{folder_name}"

## write data to specific Lakehouse as files without specifying name
df.write.mode("overwrite").format("parquet").save(abfss_path)
df.write.mode("overwrite").format("csv").save(abfss_path)

## write data to specific Lakehouse as files with specifying name
file_name = '<file name>'
df.write.mode("overwrite").format("parquet").save(f"{abfss_path}/{file_name}.parquet")
df.write.mode("overwrite").format("csv").save(f"{abfss_path}/{file_name}.csv")

Writing data into Lakehouse by Python

If we talk about Delta tables again, writing here can be done using the deltalake library, which is very similar to the delta.tables library but is intended for Python.

from deltalake import DeltaTable, write_deltalake

lakehouse_id = '<lakehouse id>'
lakehouse_workspace_id = '<workspace id of the lakehouse>'
table_name = '<table name>'
path = f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}"

write_deltalake(path, df, mode="overwrite")

Be aware of that, if you read data from a Delta Table into Python using DeltaTable, string columns are automatically extracted as Objects. Therefore, before saving them again, you need to cast them back to String, for example, using the following code:

df = df.astype({col: str for col in df.select_dtypes(include=['object']).columns})

If it is not prefered to use Pandas frames, it is of course possible to use other libraries such as Polars and use their methods for writing. For the mentioned Polars, it is the function write_delta, which allows writing to Lakehouse, requiring the path to the table, write mode, and other parameters.

import polars as pl

lakehouse_id = '<lakehouse id>'
lakehouse_workspace_id = '<workspace id of the lakehouse>'
table_name = '<table name>'
path = f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{table_name}"

## append or overwrites (including schema overwrite)
pl_frame.write_delta(path, mode="overwrite", delta_write_options={"schema_mode": "overwrite"})
pl_frame.write_delta(path, mode="append")

For files, it is quite important to distinguish whether the planned function we want to use supports the use of ABFSS or only relative paths. For example, for Pandas, it is possible to use the function to_parquet, which supports ABFSS and allows creating directories as well.ting directories.

lakehouse_id = '<lakehouse id>'
lakehouse_workspace_id = '<workspace id of the lakehouse>'
folder_name = '<folder name>'
table_name = '<table name>'

path = f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Files/{folder_name}/{table_name}.parquet"
pandas_frame.to_parquet(path)

On the other hand, the mentioned Polars, for example, for the function write_parquet needs to use a relative path. Respectively, it needs to be connected to the catalog and then write data to it. For such scenarios, it is easiest to use the mentioned default connection to Lakehouse because then you do not have to worry about paths and other parameters. However, if we want to write to another Lakehouse, we need to temporarily connect it within the given session and then write the data. For these temporary connections, it is easiest to use notebookutils.fs.mount.

 
lakehouse_id = '<lakehouse id>'
lakehouse_workspace_id = '<workspace id of the lakehouse>'
mount_name = '<mount name>'
folder_name = '<folder name>'
file_name = '<file name>'
extension = '<file extension>'
mounted_lakehouse = notebookutils.fs.mount(f"abfss://{lakehouse_workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}",mount_name) # Mount Lakehouse
mount_path = notebookutils.fs.getMountPath(mount_name) # Get the mount path
output_dir = f"{mount_path}/{folder_name}"
notebookutils.fs.mkdirs(output_dir) # Create directory if not exists
pl_frame.write_parquet(f"{output_dir}/{file_name}.{extension}") # Write data
notebookutils.fs.unmount(mount_name) # Unmount Lakehouse

Since mounting creates a connection to the given location, it is good to disconnect it again if it is no longer needed. On the other hand, you can wrap the mount in a class that will have methods for writing and disconnecting, which simplifies and clarifies the code. You can then open the connection, write all the files, and only then close the connection. This way, unnecessary repeated mounting and unmounting is avoided. The class can look like this:

class MountedWriter:
    def __init__(self, workspace_id, lakehouse_id, parent_folder_name):
        self.workspace = workspace_id
        self.lakehouse = lakehouse_id
        self.parent_folder_name = parent_folder_name
        self.mount_name = '/mnt/lakehouse'
        self.mount = notebookutils.fs.mount(
                f"abfss://{self.workspace}@onelake.dfs.fabric.microsoft.com/{self.lakehouse}",
                    self.mount_name
                )
        self.mount_path = ""
    
    def get_mounted_path(self):
        self.mount_path = notebookutils.fs.getMountPath(self.mount_name)
        return self.mount_path

    def check_or_create_existing_directory(self, folder_name):
        mount_path = self.get_mounted_path()
        output_dir = f"{mount_path}/Files/{self.parent_folder_name}/{folder_name}"
        notebookutils.fs.mkdirs(output_dir)

    def create_file(self,df_to_be_written, folder_name, file_name_parquet):
        self.check_or_create_existing_directory(folder_name)
        output_dir = f"{mount_path}/Files/{self.parent_folder_name}/{folder_name}"
        output_file = f"{output_dir}/{file_name_parquet}"
        df_to_be_written.write_parquet(output_file)

    def end_mounting(self):
        notebookutils.fs.unmount(self.mount_name)
Write your data into Lakehouse by Notebooks
Older post

Orchestration Fabric Notebooks

There are many ways to write data into Lakehouse, even using Notebooks.