5. Handling even larger data sets#
So far in this workshop, every dbt build
would rebuild all models from S3 from the underlying data. While this is fine for very small data sets, the length of time it takes to run any suitable large model (say in the range of a few million rows) across hundreds or thousands of files is far too long, and can use a ton of memory to handle. How do we make this just work?
dbt incremental materializations#
dbt has the notion of “Incremental Materailizations” - models that are handled in a different flow and require more explicit definition, and thus can be built incrementally.
Incremental models usually require a
unique_key
, if no key is provided, the model is treated as “append only”.Incremental models must define which pieces of the model run incrementally.
When invoked in normal dbt build
or dbt run
, incremental models will do the following:
Insert new data into a temp table based on the defined increment.
Delete any data from the existing model that matches the
unique_key
defined in theconfig
block.Insert data from the temp table into the existing model.
This obviously means that changes to the schema of your model need to be carefully considered - new columns mean that the model must be rebuilt entirely. A rebuild of the model is called a “full refresh” in dbt can be invoked with the full-refresh
flag in the CLI.
You can read more about this in the dbt docs.
Example of incremental materialization#
In previous examples, we pulled a list of files from S3 and returned it as a list. Using this same method, we can pass that list to a read_csv()
function and ingest all of those files. We can then use that in CTE (or sub-query) to filter our data set for incremental loading.
SET VARIABLE my_list =
select array_agg(*)
from glob(3://us-prd-motherduck-open-datasets/stocks/**/ticker_history_*.csv);
Then we can use that list in sql query.
select *
from read_csv(getvariable('my_list'), filename = true) as model
{% if is_incremental() %}
where model.filename
not in (select distinct filename
from {{ this }} )
{% endif %}
This also introduces the concept of {{ this }}
, which is a dbt relation and allows us to reference the the current model.
The full model, including the config, looks something like this:
{{
config(
materialized="incremental",
unique_key="id",
)
}}
select
info.symbol || '-' || info.filename as id,
info.*,
now() at time zone 'UTC' as updated_at
from read_csv('s3://us-prd-motherduck-open-datasets/stocks/**/ticker_info_*.csv',
filename = true) as info
{% if is_incremental() %}
where not exists (select 1 from {{ this }} ck where ck.filename = info.filename)
{% endif %}
Exercise 5.1
Update your model ticker_info.sql
to be an incremental model instead.
Exercise 5.2
Now update ticker_history.sql
and option_history.sql
to be incremental as well.