Skip to content

prefect_transform.tasks

Collection of tasks to interact with Transform metrics catalog

create_materialization

Task to create a materialization against a Transform metrics layer deployment. Please refer to Transform official documentation for more information. This task uses Transform official MQL Client under the hood.

Parameters:

Name Type Description Default
credentials TransformCredentials

TransformCredentials object used to obtain a client to interact with Transform.

required
materialization_name str

The name of the Transform materialization to create.

required
model_key_id Optional[int]

The unique identifier of the Transform model against which the transformation will be created.

None
start_time Optional[str]

The UTC start time of the materialization.

None
end_time Optional[str]

The UTC end time of the materialization.

None
output_table Optional[str]

The name of the database table, in the form of schema_name.table_name, where the materialization will be created.

None
force bool

Whether to force the materialization creation or not. Defaults to False.

False
wait_for_creation Optional[bool]

Whether to wait for the materialization creation or not. Defaults to True.

True

Returns:

Type Description
Union[MqlMaterializeResp, MqlQueryStatusResp]

An MqlQueryStatusResp object if run_async is True.

Union[MqlMaterializeResp, MqlQueryStatusResp]

An MqlMaterializeResp object if run_async is False.

from prefect import flow
from prefect_transform.tasks import (
    create_materialization
)


@flow
def trigger_materialization_creation():
    create_materialization(
        api_key="<your Transform API key>",
        mql_server_url="<your MQL Serverl URL>",
        materialization_name="<name of the materialization>",
        wait_for_creation=False
    )

trigger_materialization_creation()
Source code in prefect_transform/tasks.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@task
def create_materialization(
    credentials: TransformCredentials,
    materialization_name: str,
    model_key_id: Optional[int] = None,
    start_time: Optional[str] = None,
    end_time: Optional[str] = None,
    output_table: Optional[str] = None,
    force: bool = False,
    wait_for_creation: Optional[bool] = True,
) -> Union[MqlMaterializeResp, MqlQueryStatusResp]:
    """
    Task to create a materialization against a Transform metrics layer
    deployment.
    Please refer to [Transform official documentation](https://docs.transform.co/)
    for more information.
    This task uses [Transform official MQL Client](https://pypi.org/project/transform/)
    under the hood.

    Args:
        credentials: `TransformCredentials` object used to obtain a client to
            interact with Transform.
        materialization_name: The name of the Transform
            materialization to create.
        model_key_id: The unique identifier of the Transform model
            against which the transformation will be created.
        start_time: The UTC start time of the materialization.
        end_time: The UTC end time of the materialization.
        output_table: The name of the database table, in the form of
            `schema_name.table_name`, where the materialization will be created.
        force: Whether to force the materialization creation
            or not. Defaults to `False`.
        wait_for_creation: Whether to wait for the materialization
            creation or not. Defaults to `True`.

    Raises:
        `TransformConfigurationException` if `materialization_name` is missing.
        `TransformAuthException` if the connection with the Transform
            server cannot be established.
        `TransformRuntimeException` if the materialization creation process fails.

    Returns:
        An `MqlQueryStatusResp` object if `run_async` is `True`.
        An `MqlMaterializeResp` object if `run_async` is `False`.

    Example:
    ```python
    from prefect import flow
    from prefect_transform.tasks import (
        create_materialization
    )


    @flow
    def trigger_materialization_creation():
        create_materialization(
            api_key="<your Transform API key>",
            mql_server_url="<your MQL Serverl URL>",
            materialization_name="<name of the materialization>",
            wait_for_creation=False
        )

    trigger_materialization_creation()
    ```
    """
    use_async = not wait_for_creation
    mql_client = credentials.get_client()

    response = None
    if use_async:
        response = mql_client.create_materialization(
            materialization_name=materialization_name,
            start_time=start_time,
            end_time=end_time,
            model_key_id=model_key_id,
            output_table=output_table,
            force=force,
        )
        if response.is_failed:
            msg = f"""
            Transform materialization async creation failed! Error is: {response.error}
            """
            raise TransformRuntimeException(msg)
    else:
        try:
            response = mql_client.materialize(
                materialization_name=materialization_name,
                start_time=start_time,
                end_time=end_time,
                model_key_id=model_key_id,
                output_table=output_table,
                force=force,
            )
        except QueryRuntimeException as e:
            msg = f"Transform materialization sync creation failed! Error is: {e.msg}"
            raise TransformRuntimeException(msg)

    return response