Skip to content

prefect_stitch.stitch_client

An object that can be used to interact with Stitch APIs.

StitchClient

Create a client that can be used to interact with Stitch APIs using the provided access token to authenticate API calls.

Parameters:

Name Type Description Default
credentials StitchCredentials

access token that will be used to authenticate API calls.

required
Source code in prefect_stitch/stitch_client.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class StitchClient:
    """
    Create a client that can be used to interact with Stitch APIs
    using the provided access token to authenticate API calls.

    Args:
        credentials: access token that will be used to
            authenticate API calls.
    """

    # Stitch API base url
    __STITCH_API_URL = "https://api.stitchdata.com"

    # Stitch API version
    __STITCH_API_VERSION = "v4"

    def __init__(self, credentials: StitchCredentials) -> None:
        self.credentials = credentials

    def __get_base_url(self) -> str:
        """
        Returns Stitch API base url.

        Returns:
            Stitch base URL.
        """
        return f"{self.__STITCH_API_URL}/{self.__STITCH_API_VERSION}"

    def __get_replication_job_url(self, source_id: int) -> str:
        """
        Returns the Replication Job API URL.

        Args:
            source_id: The integer identifier of the source that
                will be used in the replication job.

        Returns:
            Replication Job API URL.
        """
        return f"{self.__get_base_url()}/sources/{source_id}/sync"

    def __get_session(self) -> Session:
        """
        Returns a `requests.Session` object that can be used in subsequent API calls.

        Returns:
            Session object configured with the proper headers.
        """
        access_token = self.credentials.access_token.get_secret_value()
        session = Session()
        session.headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        return session

    def __call_api(
        self, api_url: str, params: Optional[Dict], http_method: str
    ) -> Dict:
        """
        Make an API call to the URL using the specified parameters and HTTP method.

        Args:
            api_url: The URL of the API to call.
            params: Optional parameters to pass to the API call.
            http_method: String representing the HTTP method
                to use to make the API call.

        Raises:
            `StitchAPIFailureException` if the response code is not 200.
            `StitchAPIFailureException` if the response contains an error payload.

        Returns:
            The API JSON response.
        """

        session = self.__get_session()
        http_fn = session.get if http_method == "GET" else session.post
        with http_fn(url=api_url, params=params) as response:
            if response.status_code not in [200, 400]:
                err = f"There was an error while calling Stitch API: {response.reason}"
                raise StitchAPIFailureException(err)

            data = response.json()

            if "error" in data:
                err = data["error"]["message"]
                raise StitchAPIFailureException(
                    f"Stitch API responded with error: {err}"
                )

        return data

    def start_replication_job(self, source_id: int) -> Dict:
        """
        Start a replication job of the source.

        Args:
            source_id: The integer identifier of the source that
                will be used in the replication job.

        Returns:
            The replication job API JSON response.
        """
        if source_id is None:
            msg = "To start a replication job, please provide the the source identifier"
            raise StitchAPIFailureException(msg)
        return self.__call_api(
            api_url=self.__get_replication_job_url(source_id=source_id),
            params=None,
            http_method="POST",
        )

__call_api

Make an API call to the URL using the specified parameters and HTTP method.

Parameters:

Name Type Description Default
api_url str

The URL of the API to call.

required
params Optional[Dict]

Optional parameters to pass to the API call.

required
http_method str

String representing the HTTP method to use to make the API call.

required

Returns:

Type Description
Dict

The API JSON response.

Source code in prefect_stitch/stitch_client.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def __call_api(
    self, api_url: str, params: Optional[Dict], http_method: str
) -> Dict:
    """
    Make an API call to the URL using the specified parameters and HTTP method.

    Args:
        api_url: The URL of the API to call.
        params: Optional parameters to pass to the API call.
        http_method: String representing the HTTP method
            to use to make the API call.

    Raises:
        `StitchAPIFailureException` if the response code is not 200.
        `StitchAPIFailureException` if the response contains an error payload.

    Returns:
        The API JSON response.
    """

    session = self.__get_session()
    http_fn = session.get if http_method == "GET" else session.post
    with http_fn(url=api_url, params=params) as response:
        if response.status_code not in [200, 400]:
            err = f"There was an error while calling Stitch API: {response.reason}"
            raise StitchAPIFailureException(err)

        data = response.json()

        if "error" in data:
            err = data["error"]["message"]
            raise StitchAPIFailureException(
                f"Stitch API responded with error: {err}"
            )

    return data

__get_base_url

Returns Stitch API base url.

Returns:

Type Description
str

Stitch base URL.

Source code in prefect_stitch/stitch_client.py
32
33
34
35
36
37
38
39
def __get_base_url(self) -> str:
    """
    Returns Stitch API base url.

    Returns:
        Stitch base URL.
    """
    return f"{self.__STITCH_API_URL}/{self.__STITCH_API_VERSION}"

__get_replication_job_url

Returns the Replication Job API URL.

Parameters:

Name Type Description Default
source_id int

The integer identifier of the source that will be used in the replication job.

required

Returns:

Type Description
str

Replication Job API URL.

Source code in prefect_stitch/stitch_client.py
41
42
43
44
45
46
47
48
49
50
51
52
def __get_replication_job_url(self, source_id: int) -> str:
    """
    Returns the Replication Job API URL.

    Args:
        source_id: The integer identifier of the source that
            will be used in the replication job.

    Returns:
        Replication Job API URL.
    """
    return f"{self.__get_base_url()}/sources/{source_id}/sync"

__get_session

Returns a requests.Session object that can be used in subsequent API calls.

Returns:

Type Description
Session

Session object configured with the proper headers.

Source code in prefect_stitch/stitch_client.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def __get_session(self) -> Session:
    """
    Returns a `requests.Session` object that can be used in subsequent API calls.

    Returns:
        Session object configured with the proper headers.
    """
    access_token = self.credentials.access_token.get_secret_value()
    session = Session()
    session.headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }

    return session

start_replication_job

Start a replication job of the source.

Parameters:

Name Type Description Default
source_id int

The integer identifier of the source that will be used in the replication job.

required

Returns:

Type Description
Dict

The replication job API JSON response.

Source code in prefect_stitch/stitch_client.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def start_replication_job(self, source_id: int) -> Dict:
    """
    Start a replication job of the source.

    Args:
        source_id: The integer identifier of the source that
            will be used in the replication job.

    Returns:
        The replication job API JSON response.
    """
    if source_id is None:
        msg = "To start a replication job, please provide the the source identifier"
        raise StitchAPIFailureException(msg)
    return self.__call_api(
        api_url=self.__get_replication_job_url(source_id=source_id),
        params=None,
        http_method="POST",
    )