Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use GraphQL to make it faster #19

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 194 additions & 73 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@
Protocol,
)
from enum import Enum
from collections import defaultdict
from difflib import SequenceMatcher
from http.client import HTTPSConnection, HTTPResponse
from dataclasses import dataclass
Expand Down Expand Up @@ -659,6 +660,35 @@ def _http_get_json_paginated(self, url: str) -> Iterable[Any]:
f"Got {response.status} from {next_url!r}: {body!r}", response
)

def _http_graphql(self, query: str, variables: dict[str, Any] = {}):
request_body = json.dumps({
"query": query,
"variables": variables
})

self.connection.request(
method="POST",
url="/graphql",
headers={
"Authorization": f"token {self.github_token}",
"Accept": "application/vnd.github.v3+json",
"User-Agent": "Github Access Manager",
},
body=request_body
)
# TODO: see _http_get() regarding unimplemented headers

with self.connection.getresponse() as response:
if 200 <= response.status < 300:
json_response = json.load(response)
if "errors" in json_response:
errors = json_response['errors']
raise Exception("Got GraphQL errors", errors)
return json_response['data']

body = response.read()
raise Exception(f"Got {response.status} from {url!r}: {body!r}", response)

def get_organization(self, org: str) -> Organization:
org_data: Dict[str, Any] = self._http_get_json(f"/orgs/{org}")
default_repo_permission: str = org_data["default_repository_permission"]
Expand All @@ -670,29 +700,39 @@ def get_organization(self, org: str) -> Organization:
)

def get_organization_members(self, org: str) -> Iterable[OrganizationMember]:
# Collect the members into a list first, so we can show an accurate
# progress meter later.
members = list(self._http_get_json_paginated(f"/orgs/{org}/members"))
for i, member in enumerate(members):
username: str = member["login"]
print_status_stderr(
f"[{i + 1} / {len(members)}] Retrieving membership: {username}",
)
membership: Dict[str, Any] = self._http_get_json(
f"/orgs/{org}/memberships/{username}"
)
query = """
query($org: String!) {
organization(login: $org) {
membersWithRole(first:100) {
edges {
node {
login
databaseId
}
role
}
pageInfo {
hasNextPage
}
}
}
}
"""
variables = { "org": org }
response = self._http_graphql(query, variables)

members_with_role = response['organization']['membersWithRole']
# TODO: Support more than 100 team members
assert(members_with_role['pageInfo']['hasNextPage'] == False)

for edge in members_with_role['edges']:
node = edge['node']
yield OrganizationMember(
user_name=username,
user_id=member["id"],
role=OrganizationRole(membership["role"]),
user_name=node['login'],
user_id=node['databaseId'],
role=OrganizationRole(edge['role'].lower()),
)

# After the final status update, clear the line again, so the final
# output is not mixed with status updates. (They go separately to stdout
# and stderr anyway, but in a terminal you don’t want interleaved
# output.)
print_status_stderr("")

def get_organization_teams(self, org: str) -> Iterable[Team]:
teams = self._http_get_json_paginated(f"/orgs/{org}/teams")
for team in teams:
Expand All @@ -718,62 +758,143 @@ def get_team_members(self, org: str, team: Team) -> Iterable[TeamMember]:
team_name=team.name,
)

def get_repository_teams(
self, org: str, repo: str
) -> Iterable[TeamRepositoryAccess]:
teams = self._http_get_json_paginated(f"/repos/{org}/{repo}/teams")
for team in teams:
permissions: Dict[str, bool] = team["permissions"]
yield TeamRepositoryAccess(
team_name=team["name"],
role=RepositoryAccessRole.from_permissions_dict(permissions),
)
def get_organization_repo_to_teams_map(self, org: str) -> dict[str, [TeamRepositoryaccess]]:
query = """
query($org: String!, $cursor: String) {
organization(login: $org) {
teams(first: 100) {
nodes {
name
repositories(first: 100, after: $cursor) {
edges {
permission
node {
databaseId
}
}
pageInfo {
hasNextPage
endCursor
}
totalCount
}
}
pageInfo {
hasNextPage
}
}
}
}
"""

def get_repository_users(
self, org: str, repo: str
) -> Iterable[UserRepositoryAccess]:
# We query with affiliation=direct to get all users that have explicit
# access to the repository (i.e. not those who have implicit access
# through being a member of a group). The default is affiliation=all,
# which also returns users with implicit access.
users = self._http_get_json_paginated(f"/repos/{org}/{repo}/collaborators?affiliation=direct")
for user in users:
permissions: Dict[str, bool] = user["permissions"]
yield UserRepositoryAccess(
user_id=user["id"],
user_name=user["login"],
role=RepositoryAccessRole.from_permissions_dict(permissions),
)
repo_to_teams: defaultdict[str, [TeamRepositoryaccess]] = defaultdict(list)

cursor = None
while True:
variables = { "org": org, "cursor": cursor }
response = self._http_graphql(query, variables)

teams = response['organization']['teams']
# Assume we have less than 100 teams and skip pagination
assert(teams['pageInfo']['hasNextPage'] == False)

has_next_page = False
next_cursors = []

for team in teams['nodes']:
for repo in team['repositories']['edges']:
repo_to_teams[repo['node']['databaseId']].append(TeamRepositoryAccess(
team_name=team['name'],
role=RepositoryAccessRole(repo['permission'].lower())
))

team_has_next_page = team['repositories']['pageInfo']['hasNextPage']
has_next_page |= team_has_next_page
if team_has_next_page:
next_cursors.append(team['repositories']['pageInfo']['endCursor'])

if not has_next_page:
break

[cursor] = set(next_cursors) # Asserts that all next cursors are the same

print(json.dumps({ key: [team.team_name for team in teams] for key, teams in repo_to_teams.items()}))
return dict(repo_to_teams)

def get_organization_repositories(self, org: str) -> Iterable[Repository]:
# Listing repositories is a slow endpoint, and paginated as well, print
# some progress. Technically from the pagination headers we could
# extract more precise progress, but I am not going to bother.
print_status_stderr("[1 / ??] Listing organization repositories")
repos = []
for i, more_repos in enumerate(
self._http_get_json_paginated(f"/orgs/{org}/repos?per_page=100")
):
repos.append(more_repos)
print_status_stderr(
f"[{len(repos)} / ??] Listing organization repositories"
)
# Materialize to a list so we know the total so we can show a progress
# counter.
n = len(repos)
for i, repo in enumerate(repos):
name = repo["name"]
print_status_stderr(f"[{i + 1} / {n}] Getting access on {name}")
user_access = tuple(sorted(self.get_repository_users(org, name)))
team_access = tuple(sorted(self.get_repository_teams(org, name)))
yield Repository(
repo_id=repo["id"],
name=name,
visibility=RepositoryVisibility(repo["visibility"]),
user_access=user_access,
team_access=team_access,
)
print_status_stderr("")
query = """
query($org: String!, $cursor: String) {
organization(login: $org) {
repositories(first:100, after: $cursor) {
nodes {
databaseId
name
visibility
# We query with affiliation=direct to get all users that have explicit
# access to the repository (i.e. not those who have implicit access
# through being a member of a group). The default is affiliation=all,
# which also returns users with implicit access.
collaborators(affiliation: DIRECT, first: 100) {
edges {
node {
databaseId
login
}
permission
}
pageInfo {
hasNextPage
}
}
}
pageInfo {
hasNextPage
endCursor
}
totalCount
}
}
}
"""

repo_to_teams = self.get_organization_repo_to_teams_map(org)

cursor = None
while True:
variables = { "org": org, "cursor": cursor }
print(f"shooting repositories query for cursor {cursor}")
response = self._http_graphql(query, variables)

repos = response['organization']['repositories']

for repo in repos['nodes']:
repo_id = repo['databaseId']

collaborators = repo['collaborators']
# Assume we have less than 100 directs collaborators to any repo and skip pagination
assert(collaborators['pageInfo']['hasNextPage'] == False)
user_access = tuple(sorted(UserRepositoryAccess(
user_id=collaborator['node']['databaseId'],
user_name=collaborator['node']['login'],
role=RepositoryAccessRole(collaborator['permission'].lower()),
) for collaborator in collaborators['edges']))

if repo_id == 733475299:
print(f"BLEHBLEH {repo['name']}")
team_access = tuple(sorted(repo_to_teams.get(repo_id, [])))

yield Repository(
repo_id=repo_id,
name=repo['name'],
visibility=RepositoryVisibility(repo["visibility"].lower()),
user_access=user_access,
team_access=team_access,
)

page_info = repos['pageInfo']
if not page_info['hasNextPage']:
break
cursor = page_info['endCursor']


def print_indented(lines: str) -> None:
Expand Down