-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreams.py
72 lines (59 loc) · 2.04 KB
/
streams.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
"""Stream type classes for tap-msal."""
from __future__ import annotations
import sys
import typing as t
from singer_sdk import typing as th # JSON Schema typing helpers
from tap_msal.client import msalStream
if sys.version_info >= (3, 9):
import importlib.resources as importlib_resources
else:
import importlib_resources
# TODO: Delete this is if not using json files for schema definition
SCHEMAS_DIR = importlib_resources.files(__package__) / "schemas"
# TODO: - Override `UsersStream` and `GroupsStream` with your own stream definition.
# - Copy-paste as many times as needed to create multiple stream types.
class UsersStream(msalStream):
"""Define custom stream."""
name = "users"
path = "/users"
primary_keys: t.ClassVar[list[str]] = ["id"]
replication_key = None
# Optionally, you may also use `schema_filepath` in place of `schema`:
# schema_filepath = SCHEMAS_DIR / "users.json" # noqa: ERA001
schema = th.PropertiesList(
th.Property("name", th.StringType),
th.Property(
"id",
th.StringType,
description="The user's system ID",
),
th.Property(
"age",
th.IntegerType,
description="The user's age in years",
),
th.Property(
"email",
th.StringType,
description="The user's email address",
),
th.Property("street", th.StringType),
th.Property("city", th.StringType),
th.Property(
"state",
th.StringType,
description="State name in ISO 3166-2 format",
),
th.Property("zip", th.StringType),
).to_dict()
class GroupsStream(msalStream):
"""Define custom stream."""
name = "groups"
path = "/groups"
primary_keys: t.ClassVar[list[str]] = ["id"]
replication_key = "modified"
schema = th.PropertiesList(
th.Property("name", th.StringType),
th.Property("id", th.StringType),
th.Property("modified", th.DateTimeType),
).to_dict()