-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathregistry.py
166 lines (152 loc) · 6.64 KB
/
registry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# bug-report: [email protected]
""" api for docker registry """
import urllib2
import urllib
import json
import base64
class RegistryException(Exception):
""" registry api related exception """
pass
class RegistryApi(object):
""" interact with docker registry and harbor """
def __init__(self, username, password, registry_endpoint):
self.username = username
self.password = password
self.basic_token = base64.encodestring("%s:%s" % (str(username), str(password)))[0:-1]
self.registry_endpoint = registry_endpoint.rstrip('/')
#print("%s/v2/_catalog" % (self.registry_endpoint,))
auth = self.pingRegistry("%s/v2/_catalog" % (self.registry_endpoint,))
if auth is None:
raise RegistryException("get token realm and service failed")
self.token_endpoint = auth[0]
self.service = auth[1]
def pingRegistry(self, registry_endpoint):
""" ping v2 registry and get realm and service """
headers = dict()
try:
res = urllib2.urlopen(registry_endpoint)
except urllib2.HTTPError as e:
headers = e.hdrs.dict
try:
(realm, service, _) = headers['www-authenticate'].split(',')
return (realm[14:-1:], service[9:-1])
except Exception as e:
return None
def getBearerTokenForScope(self, scope):
""" get bearer token from harbor """
payload = urllib.urlencode({'service': self.service, 'scope': scope})
url = "%s?%s" % (self.token_endpoint, payload)
req = urllib2.Request(url)
req.add_header('Authorization', 'Basic %s' % (self.basic_token,))
try:
response = urllib2.urlopen(req)
return json.loads(response.read())["token"]
except Exception as e:
return None
def getRepositoryList(self, n=None):
""" get repository list """
scope = "registry:catalog:*"
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
return None
url = "%s/v2/_catalog" % (self.registry_endpoint,)
if n is not None:
url = "%s?n=%s" % (url, str(n))
req = urllib2.Request(url)
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
try:
response = urllib2.urlopen(req)
return json.loads(response.read())
except Exception as e:
return None
def getTagList(self, repository):
""" get tag list for repository """
scope = "repository:%s:pull" % (repository,)
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
return None
url = "%s/v2/%s/tags/list" % (self.registry_endpoint, repository)
req = urllib2.Request(url)
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
try:
response = urllib2.urlopen(req)
return json.loads(response.read())
except Exception as e:
return None
def getManifest(self, repository, reference="latest", v1=False):
""" get manifest for tag or digest """
scope = "repository:%s:pull" % (repository,)
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
return None
url = "%s/v2/%s/manifests/%s" % (self.registry_endpoint, repository, reference)
req = urllib2.Request(url)
req.get_method = lambda: 'GET'
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v2+json')
if v1:
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v1+json')
try:
response = urllib2.urlopen(req)
return json.loads(response.read())
except Exception as e:
return None
def existManifest(self, repository, reference, v1=False):
""" check to see it manifest exist """
scope = "repository:%s:pull" % (repository,)
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
raise RegistryException("manifestExist failed due to token error")
url = "%s/v2/%s/manifests/%s" % (self.registry_endpoint, repository, reference)
req = urllib2.Request(url)
req.get_method = lambda: 'HEAD'
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v2+json')
if v1:
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v1+json')
try:
response = urllib2.urlopen(req)
return (True, response.headers.dict["docker-content-digest"])
except Exception as e:
return (False, None)
def deleteManifest(self, repository, reference):
""" delete manifest by tag """
(is_exist, digest) = self.existManifest(repository, reference)
if not is_exist:
raise RegistryException("manifest not exist")
scope = "repository:%s:pull,push" % (repository,)
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
raise RegistryException("delete manifest failed due to token error")
url = "%s/v2/%s/manifests/%s" % (self.registry_endpoint, repository, digest)
req = urllib2.Request(url)
req.get_method = lambda: 'DELETE'
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
try:
urllib2.urlopen(req)
except Exception as e:
return False
return True
def getManifestWithConf(self, repository, reference="latest"):
""" get manifest for tag or digest """
manifest = self.getManifest(repository, reference)
if manifest is None:
raise RegistryException("manifest for %s %s not exist" % (repository, reference))
config_digest = manifest["config"]["digest"]
scope = "repository:%s:pull" % (repository,)
bear_token = self.getBearerTokenForScope(scope)
if bear_token is None:
return None
url = "%s/v2/%s/blobs/%s" % (self.registry_endpoint, repository, config_digest)
req = urllib2.Request(url)
req.get_method = lambda: 'GET'
req.add_header('Authorization', r'Bearer %s' % (bear_token,))
req.add_header('Accept', 'application/vnd.docker.distribution.manifest.v2+json')
try:
response = urllib2.urlopen(req)
manifest["configContent"] = json.loads(response.read())
return manifest
except Exception as e:
return None