Skip to content

prefect_metricflow.tasks

Collections of tasks to interact with MetricFlow

drop_materialization

Drop a materialization that was previously created by MetricFlow.

Parameters:

Name Type Description Default
materialization_name str

The name of the materialization to drop.

required
config Optional[Union[Dict, str]]

MetricFlow configuration. Can be either a dict or a YAML string. If provided, will be persisted at the path specified in config_file_path.

None
config_file_path Optional[str]

Path to MetricFlow config file. If not provided, the default path will be used.

None

Returns:

Type Description
bool

True if MetricFlow has successfully dropped the materialization table,

bool

False if the materialization table does not exist.

Source code in prefect_metricflow/tasks.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
@task
def drop_materialization(
    materialization_name: str,
    config: Optional[Union[Dict, str]] = None,
    config_file_path: Optional[str] = None,
) -> bool:
    """
    Drop a materialization that was previously created by MetricFlow.

    Args:
        materialization_name: The name of the materialization to drop.
        config: MetricFlow configuration. Can be either a `dict` or a YAML string.
            If provided, will be persisted at the path specified in `config_file_path`.
        config_file_path: Path to MetricFlow config file.
            If not provided, the default path will be used.

    Returns:
        `True` if MetricFlow has successfully dropped the materialization table,
        `False` if the materialization table does not exist.
    """

    mf_config_file_path = get_config_file_path(config_file_path=config_file_path)

    # If a config is provided, try to use it.
    if config:

        persist_config(config=config, file_path=mf_config_file_path)

    # Create MetricFlow client
    mfc = MetricFlowClient.from_config(config_file_path=mf_config_file_path)

    # Build materialization and return result
    return mfc.drop_materialization(materialization_name=materialization_name)

materialize

Materialize metrics on the target DWH.

Parameters:

Name Type Description Default
materialization_name str

The name of the materialization to be created.

required
start_time Optional[str]

The start time range to be used to build the materialization.

None
end_time Optional[str]

The end time range to be used to build the materialization.

None
config Optional[Union[Dict, str]]

MetricFlow configuration. Can be either a dict or a YAML string. If provided, will be persisted at the path specified in config_file_path.

None
config_file_path Optional[str]

Path to MetricFlow config file. If not provided, the default path will be used.

None

Returns:

Type Description
SqlTable

a SqlTable with references to the newly created materialization.

Source code in prefect_metricflow/tasks.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@task
def materialize(
    materialization_name: str,
    start_time: Optional[str] = None,
    end_time: Optional[str] = None,
    config: Optional[Union[Dict, str]] = None,
    config_file_path: Optional[str] = None,
) -> SqlTable:
    """
    Materialize metrics on the target DWH.

    Args:
        materialization_name: The name of the materialization to be created.
        start_time: The start time range to be used to build the materialization.
        end_time: The end time range to be used to build the materialization.
        config: MetricFlow configuration. Can be either a `dict` or a YAML string.
            If provided, will be persisted at the path specified in `config_file_path`.
        config_file_path: Path to MetricFlow config file.
            If not provided, the default path will be used.

    Raises:
        `MetricFlowFailureException` if `config` is not a valid YAML string.

    Returns:
        a SqlTable with references to the newly created materialization.
    """

    mf_config_file_path = get_config_file_path(config_file_path=config_file_path)

    # If a config is provided, try to use it.
    if config:

        persist_config(config=config, file_path=mf_config_file_path)

    # Create MetricFlow client
    mfc = MetricFlowClient.from_config(config_file_path=mf_config_file_path)

    # Build materialization and return result
    return mfc.materialize(
        materialization_name=materialization_name,
        start_time=start_time,
        end_time=end_time,
    )