Skip to content

prefect_soda_cloud.tasks

Collection of tasks to interact with Soda Cloud

Classes

Functions

get_scan_logs

Retrieve scan logs from Soda Cloud given the scan ID.

Parameters:

Name Type Description Default
scan_id str

Scan identifier provided by Soda Cloud.

required
soda_cloud_auth_config SodaCloudAuthConfig

The auth configuration to use to trigger the scan.

required

Returns:

Type Description
List[dict]

A list of dict, each dict being a Soda Cloud log message.

Source code in prefect_soda_cloud/tasks.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@task
def get_scan_logs(
    scan_id: str, soda_cloud_auth_config: SodaCloudAuthConfig
) -> List[dict]:
    """
    Retrieve scan logs from Soda Cloud given the scan ID.

    Args:
        scan_id: Scan identifier provided by Soda Cloud.
        soda_cloud_auth_config: The auth configuration to use to trigger the scan.

    Returns:
        A list of dict, each dict being a Soda Cloud log message.
    """
    soda_cloud_client = soda_cloud_auth_config.get_client(logger=get_run_logger())
    scan_logs = soda_cloud_client.get_scan_logs(scan_id=scan_id)
    return scan_logs

get_scan_status

Retrieve scan status from Soda Cloud given the scan ID.

Parameters:

Name Type Description Default
scan_id str

Scan identifier provided by Soda Cloud.

required
soda_cloud_auth_config SodaCloudAuthConfig

The auth configuration to use to trigger the scan.

required
wait_for_scan_end bool

Whether to wait for the scan execution to finish or not.

False

Returns:

Type Description
dict

A dictionary that describes the status of the scan.

Source code in prefect_soda_cloud/tasks.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@task
def get_scan_status(
    scan_id: str,
    soda_cloud_auth_config: SodaCloudAuthConfig,
    wait_for_scan_end: bool = False,
) -> dict:
    """
    Retrieve scan status from Soda Cloud given the scan ID.

    Args:
        scan_id: Scan identifier provided by Soda Cloud.
        soda_cloud_auth_config: The auth configuration to use to trigger the scan.
        wait_for_scan_end: Whether to wait for the scan execution to finish or not.

    Returns:
        A dictionary that describes the status of the scan.
    """
    soda_cloud_client = soda_cloud_auth_config.get_client(logger=get_run_logger())
    scan_status = soda_cloud_client.get_scan_status(
        scan_id=scan_id, wait_for_scan_end=wait_for_scan_end
    )
    return scan_status

trigger_scan

Trigger a scan given its name.

Parameters:

Name Type Description Default
scan_name str

The name of the scan to trigger.

required
soda_cloud_auth_config SodaCloudAuthConfig

The auth configuration to use to trigger the scan.

required

Returns:

Type Description
str

The Scan identifier.

from prefect_soda_cloud import SodaCloudAuthConfig, SodaCloudCredentials
from prefect_soda_cloud.tasks import trigger_scan

creds = SodaCloudCredentials(
    user_or_api_key_id="the_user",
    pwd_or_api_key_secret="the_password"
)

auth_config = SodaCloudAuthConfig(
    api_base_url="https://cloud.soda.io",
    creds=creds
)

scan_id = trigger_scan(
    soda_cloud_auth_config=auth_config,
    data_timestamp=None
)
Source code in prefect_soda_cloud/tasks.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@task
def trigger_scan(
    scan_name: str,
    soda_cloud_auth_config: SodaCloudAuthConfig,
    data_timestamp: Optional[datetime] = None,
) -> str:
    """
    Trigger a scan given its name.

    Args:
        scan_name: The name of the scan to trigger.
        soda_cloud_auth_config: The auth configuration to use to trigger the scan.

    Returns:
        The Scan identifier.

    Example:
    ```python
    from prefect_soda_cloud import SodaCloudAuthConfig, SodaCloudCredentials
    from prefect_soda_cloud.tasks import trigger_scan

    creds = SodaCloudCredentials(
        user_or_api_key_id="the_user",
        pwd_or_api_key_secret="the_password"
    )

    auth_config = SodaCloudAuthConfig(
        api_base_url="https://cloud.soda.io",
        creds=creds
    )

    scan_id = trigger_scan(
        soda_cloud_auth_config=auth_config,
        data_timestamp=None
    )
    ```
    """
    soda_cloud_client = soda_cloud_auth_config.get_client(logger=get_run_logger())
    scan_id = soda_cloud_client.trigger_scan(
        scan_name=scan_name, data_timestamp=data_timestamp
    )

    return scan_id