DagsterDocs

Advanced: Materializations#

You can find the code for this example on Github

Steps in a data pipeline often produce persistent artifacts, for instance, graphs or tables describing the result of some computation. Typically these artifacts are saved to disk (or to cloud storage) with a name that has something to do with their origin. But it can be hard to organize and cross-reference artifacts produced by many different runs of a pipeline, or to identify all of the files that might have been created by some pipeline's logic.

Dagster solids can describe their persistent artifacts to the system by yielding AssetMaterialization events. Like TypeCheck and ExpectationResult, asset materializations are side-channels for metadata -- they don't get passed to downstream solids and they aren't used to define the data dependencies that structure a pipeline's DAG.

Suppose that we rewrite our sort_calories solid so that it saves the newly sorted data frame to disk.

@solid
def sort_by_calories(context, cereals):
    sorted_cereals = sorted(
        cereals, key=lambda cereal: int(cereal["calories"])
    )
    context.log.info(
        "Least caloric cereal: {least_caloric}".format(
            least_caloric=sorted_cereals[0]["name"]
        )
    )
    context.log.info(
        "Most caloric cereal: {most_caloric}".format(
            most_caloric=sorted_cereals[-1]["name"]
        )
    )
    fieldnames = list(sorted_cereals[0].keys())
    sorted_cereals_csv_path = os.path.abspath(
        "output/calories_sorted_{run_id}.csv".format(run_id=context.run_id)
    )
    os.makedirs(os.path.dirname(sorted_cereals_csv_path), exist_ok=True)
    with open(sorted_cereals_csv_path, "w") as fd:
        writer = csv.DictWriter(fd, fieldnames)
        writer.writeheader()
        writer.writerows(sorted_cereals)

We've taken the basic precaution of ensuring that the saved csv file has a different filename for each run of the pipeline. But there's no way for Dagit to know about this persistent artifact. So we'll add the following lines:

@solid
def sort_by_calories(context, cereals):
    sorted_cereals = sorted(
        cereals, key=lambda cereal: int(cereal["calories"])
    )
    context.log.info(
        "Least caloric cereal: {least_caloric}".format(
            least_caloric=sorted_cereals[0]["name"]
        )
    )
    context.log.info(
        "Most caloric cereal: {most_caloric}".format(
            most_caloric=sorted_cereals[-1]["name"]
        )
    )
    fieldnames = list(sorted_cereals[0].keys())
    sorted_cereals_csv_path = os.path.abspath(
        "output/calories_sorted_{run_id}.csv".format(run_id=context.run_id)
    )
    os.makedirs(os.path.dirname(sorted_cereals_csv_path), exist_ok=True)
    with open(sorted_cereals_csv_path, "w") as fd:
        writer = csv.DictWriter(fd, fieldnames)
        writer.writeheader()
        writer.writerows(sorted_cereals)
    yield AssetMaterialization(
        asset_key="sorted_cereals_csv",
        description="Cereals data frame sorted by caloric content",
        metadata_entries=[
            EventMetadataEntry.path(
                sorted_cereals_csv_path, "sorted_cereals_csv_path"
            )
        ],
    )
    yield Output(None)

Note that we've had to add the last line, yielding an Output. Until now, all of our solids have relied on Dagster's implicit conversion of the return value of a solid's compute function into its output. When we explicitly yield other types of events from solid logic, we need to also explicitly yield the output so that the framework can recognize them.

Now, if we run this pipeline in Dagit:

materializations.png