Create a client that can be used to interact with Stitch APIs
using the provided access token to authenticate API calls.
Parameters:
Name |
Type |
Description |
Default |
credentials |
StitchCredentials
|
access token that will be used to
authenticate API calls. |
required
|
Source code in prefect_stitch/stitch_client.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 | class StitchClient:
"""
Create a client that can be used to interact with Stitch APIs
using the provided access token to authenticate API calls.
Args:
credentials: access token that will be used to
authenticate API calls.
"""
# Stitch API base url
__STITCH_API_URL = "https://api.stitchdata.com"
# Stitch API version
__STITCH_API_VERSION = "v4"
def __init__(self, credentials: StitchCredentials) -> None:
self.credentials = credentials
def __get_base_url(self) -> str:
"""
Returns Stitch API base url.
Returns:
Stitch base URL.
"""
return f"{self.__STITCH_API_URL}/{self.__STITCH_API_VERSION}"
def __get_replication_job_url(self, source_id: int) -> str:
"""
Returns the Replication Job API URL.
Args:
source_id: The integer identifier of the source that
will be used in the replication job.
Returns:
Replication Job API URL.
"""
return f"{self.__get_base_url()}/sources/{source_id}/sync"
def __get_session(self) -> Session:
"""
Returns a `requests.Session` object that can be used in subsequent API calls.
Returns:
Session object configured with the proper headers.
"""
access_token = self.credentials.access_token.get_secret_value()
session = Session()
session.headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
return session
def __call_api(
self, api_url: str, params: Optional[Dict], http_method: str
) -> Dict:
"""
Make an API call to the URL using the specified parameters and HTTP method.
Args:
api_url: The URL of the API to call.
params: Optional parameters to pass to the API call.
http_method: String representing the HTTP method
to use to make the API call.
Raises:
`StitchAPIFailureException` if the response code is not 200.
`StitchAPIFailureException` if the response contains an error payload.
Returns:
The API JSON response.
"""
session = self.__get_session()
http_fn = session.get if http_method == "GET" else session.post
with http_fn(url=api_url, params=params) as response:
if response.status_code not in [200, 400]:
err = f"There was an error while calling Stitch API: {response.reason}"
raise StitchAPIFailureException(err)
data = response.json()
if "error" in data:
err = data["error"]["message"]
raise StitchAPIFailureException(
f"Stitch API responded with error: {err}"
)
return data
def start_replication_job(self, source_id: int) -> Dict:
"""
Start a replication job of the source.
Args:
source_id: The integer identifier of the source that
will be used in the replication job.
Returns:
The replication job API JSON response.
"""
if source_id is None:
msg = "To start a replication job, please provide the the source identifier"
raise StitchAPIFailureException(msg)
return self.__call_api(
api_url=self.__get_replication_job_url(source_id=source_id),
params=None,
http_method="POST",
)
|
__call_api
Make an API call to the URL using the specified parameters and HTTP method.
Parameters:
Name |
Type |
Description |
Default |
api_url |
str
|
The URL of the API to call. |
required
|
params |
Optional[Dict]
|
Optional parameters to pass to the API call. |
required
|
http_method |
str
|
String representing the HTTP method
to use to make the API call. |
required
|
Returns:
Type |
Description |
Dict
|
The API JSON response. |
Source code in prefect_stitch/stitch_client.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105 | def __call_api(
self, api_url: str, params: Optional[Dict], http_method: str
) -> Dict:
"""
Make an API call to the URL using the specified parameters and HTTP method.
Args:
api_url: The URL of the API to call.
params: Optional parameters to pass to the API call.
http_method: String representing the HTTP method
to use to make the API call.
Raises:
`StitchAPIFailureException` if the response code is not 200.
`StitchAPIFailureException` if the response contains an error payload.
Returns:
The API JSON response.
"""
session = self.__get_session()
http_fn = session.get if http_method == "GET" else session.post
with http_fn(url=api_url, params=params) as response:
if response.status_code not in [200, 400]:
err = f"There was an error while calling Stitch API: {response.reason}"
raise StitchAPIFailureException(err)
data = response.json()
if "error" in data:
err = data["error"]["message"]
raise StitchAPIFailureException(
f"Stitch API responded with error: {err}"
)
return data
|
__get_base_url
Returns Stitch API base url.
Returns:
Type |
Description |
str
|
Stitch base URL. |
Source code in prefect_stitch/stitch_client.py
| def __get_base_url(self) -> str:
"""
Returns Stitch API base url.
Returns:
Stitch base URL.
"""
return f"{self.__STITCH_API_URL}/{self.__STITCH_API_VERSION}"
|
__get_replication_job_url
Returns the Replication Job API URL.
Parameters:
Name |
Type |
Description |
Default |
source_id |
int
|
The integer identifier of the source that
will be used in the replication job. |
required
|
Returns:
Type |
Description |
str
|
Replication Job API URL. |
Source code in prefect_stitch/stitch_client.py
41
42
43
44
45
46
47
48
49
50
51
52 | def __get_replication_job_url(self, source_id: int) -> str:
"""
Returns the Replication Job API URL.
Args:
source_id: The integer identifier of the source that
will be used in the replication job.
Returns:
Replication Job API URL.
"""
return f"{self.__get_base_url()}/sources/{source_id}/sync"
|
__get_session
Returns a requests.Session
object that can be used in subsequent API calls.
Returns:
Type |
Description |
Session
|
Session object configured with the proper headers. |
Source code in prefect_stitch/stitch_client.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68 | def __get_session(self) -> Session:
"""
Returns a `requests.Session` object that can be used in subsequent API calls.
Returns:
Session object configured with the proper headers.
"""
access_token = self.credentials.access_token.get_secret_value()
session = Session()
session.headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
return session
|
start_replication_job
Start a replication job of the source.
Parameters:
Name |
Type |
Description |
Default |
source_id |
int
|
The integer identifier of the source that
will be used in the replication job. |
required
|
Returns:
Type |
Description |
Dict
|
The replication job API JSON response. |
Source code in prefect_stitch/stitch_client.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 | def start_replication_job(self, source_id: int) -> Dict:
"""
Start a replication job of the source.
Args:
source_id: The integer identifier of the source that
will be used in the replication job.
Returns:
The replication job API JSON response.
"""
if source_id is None:
msg = "To start a replication job, please provide the the source identifier"
raise StitchAPIFailureException(msg)
return self.__call_api(
api_url=self.__get_replication_job_url(source_id=source_id),
params=None,
http_method="POST",
)
|