-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cluster - Client library API migration changes #177
base: main
Are you sure you want to change the base?
Changes from 18 commits
14a7916
75e1576
c8f9643
654b621
9659c02
3708794
25e25c1
cdec0d7
2a37f2c
d539d90
1d60038
611fbac
cd84677
2b7b95b
9b71492
312902a
68c21db
4d70971
65d004f
2eb3d98
cb818d0
8a6072a
c5ae698
6daa124
7f64795
50f2d2d
a734f1e
ff65fda
7580728
36dfd9e
a3edf3d
d8e3567
f87ce93
f656955
218ecde
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,7 +22,7 @@ | |
from dataproc_jupyter_plugin.services import dataproc | ||
|
||
|
||
class ClusterListController(APIHandler): | ||
class RuntimeController(APIHandler): | ||
@tornado.web.authenticated | ||
async def get(self): | ||
try: | ||
|
@@ -32,25 +32,74 @@ async def get(self): | |
client = dataproc.Client( | ||
await credentials.get_cached(), self.log, client_session | ||
) | ||
cluster_list = await client.list_clusters(page_size, page_token) | ||
self.finish(json.dumps(cluster_list)) | ||
runtime_list = await client.list_runtime(page_size, page_token) | ||
self.finish(json.dumps(runtime_list)) | ||
except Exception as e: | ||
self.log.exception("Error fetching cluster list") | ||
self.log.exception(f"Error fetching runtime template list: {str(e)}") | ||
self.finish({"error": str(e)}) | ||
|
||
|
||
class RuntimeController(APIHandler): | ||
class ClusterListController(APIHandler): | ||
@tornado.web.authenticated | ||
async def get(self): | ||
try: | ||
page_token = self.get_argument("pageToken") | ||
page_size = self.get_argument("pageSize") | ||
async with aiohttp.ClientSession() as client_session: | ||
client = dataproc.Client( | ||
await credentials.get_cached(), self.log, client_session | ||
) | ||
runtime_list = await client.list_runtime(page_size, page_token) | ||
self.finish(json.dumps(runtime_list)) | ||
client = dataproc.Client(await credentials.get_cached(), self.log) | ||
cluster_list = await client.list_clusters(page_size, page_token) | ||
self.finish(json.dumps(cluster_list)) | ||
except Exception as e: | ||
self.log.exception(f"Error fetching runtime template list: {str(e)}") | ||
self.log.exception(f"Error fetching cluster list") | ||
self.finish({"error": str(e)}) | ||
|
||
|
||
class ClusterDetailController(APIHandler): | ||
@tornado.web.authenticated | ||
async def get(self): | ||
try: | ||
cluster = self.get_argument("cluster") | ||
client = dataproc.Client(await credentials.get_cached(), self.log) | ||
get_cluster = await client.get_cluster_detail(cluster) | ||
self.finish(json.dumps(get_cluster)) | ||
except Exception as e: | ||
self.log.exception(f"Error fetching get cluster") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
e.g. |
||
self.finish({"error": str(e)}) | ||
|
||
|
||
class StopClusterController(APIHandler): | ||
@tornado.web.authenticated | ||
async def post(self): | ||
try: | ||
cluster = self.get_argument("cluster") | ||
client = dataproc.Client(await credentials.get_cached(), self.log) | ||
stop_cluster = await client.stop_cluster(cluster) | ||
self.finish(json.dumps(stop_cluster)) | ||
except Exception as e: | ||
self.log.exception(f"Error fetching stop cluster") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There's no |
||
self.finish({"error": str(e)}) | ||
|
||
|
||
class StartClusterController(APIHandler): | ||
@tornado.web.authenticated | ||
async def post(self): | ||
try: | ||
cluster = self.get_argument("cluster") | ||
client = dataproc.Client(await credentials.get_cached(), self.log) | ||
start_cluster = await client.start_cluster(cluster) | ||
self.finish(json.dumps(start_cluster)) | ||
except Exception as e: | ||
self.log.exception(f"Error fetching start cluster") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, there is no fetch and the error must be included in the log message |
||
self.finish({"error": str(e)}) | ||
|
||
|
||
class DeleteClusterController(APIHandler): | ||
@tornado.web.authenticated | ||
async def delete(self): | ||
try: | ||
cluster = self.get_argument("cluster") | ||
client = dataproc.Client(await credentials.get_cached(), self.log) | ||
delete_cluster = await client.delete_cluster(cluster) | ||
self.finish(json.dumps(delete_cluster)) | ||
except Exception as e: | ||
self.log.exception(f"Error deleting cluster") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Include the error in the log message |
||
self.finish({"error": str(e)}) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,9 +18,15 @@ | |
DATAPROC_SERVICE_NAME, | ||
) | ||
|
||
from google.cloud import dataproc_v1 as dataproc | ||
import proto | ||
import json | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This import order is wrong. Standard library imports (e.g. Within each section, the imports should be in alphabetical order unless the imports have side effects that must be performed in a specific order. |
||
import google.oauth2.credentials as oauth2 | ||
from google.protobuf.empty_pb2 import Empty | ||
|
||
|
||
class Client: | ||
def __init__(self, credentials, log, client_session): | ||
def __init__(self, credentials, log, client_session=None): | ||
self.log = log | ||
if not ( | ||
("access_token" in credentials) | ||
|
@@ -40,10 +46,10 @@ def create_headers(self): | |
"Authorization": f"Bearer {self._access_token}", | ||
} | ||
|
||
async def list_clusters(self, page_size, page_token): | ||
async def list_runtime(self, page_size, page_token): | ||
try: | ||
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) | ||
api_endpoint = f"{dataproc_url}/v1/projects/{self.project_id}/regions/{self.region_id}/clusters?pageSize={page_size}&pageToken={page_token}" | ||
api_endpoint = f"{dataproc_url}/v1/projects/{self.project_id}/locations/{self.region_id}/sessionTemplates?pageSize={page_size}&pageToken={page_token}" | ||
async with self.client_session.get( | ||
api_endpoint, headers=self.create_headers() | ||
) as response: | ||
|
@@ -52,27 +58,146 @@ async def list_clusters(self, page_size, page_token): | |
return resp | ||
else: | ||
return { | ||
"error": f"Failed to fetch clusters: {response.status} {await response.text()}" | ||
"error": f"Failed to fetch runtimes: {response.status} {await response.text()}" | ||
} | ||
except Exception as e: | ||
self.log.exception(f"Error fetching runtime list: {str(e)}") | ||
return {"error": str(e)} | ||
|
||
async def list_clusters(self, page_size, page_token): | ||
try: | ||
# Create a client | ||
client = dataproc.ClusterControllerAsyncClient( | ||
client_options={ | ||
"api_endpoint": f"us-central1-dataproc.googleapis.com:443" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is clearly wrong on multiple levels... First off al, we can't hardcode the API endpoint to a single region. In fact, I don't see why we would specify a region at all... although a region can be specified as part of this, the default used by the client library if it is not specified does not have a region in it. Further, we have to use the API endpoint override for Dataproc if it was configured by the user. E.G. we could detect if the user configured this using Finally, this logic needs to move into the |
||
}, | ||
credentials=oauth2.Credentials(self._access_token), | ||
) | ||
|
||
# Initialize request argument(s) | ||
request = dataproc.ListClustersRequest( | ||
project_id=self.project_id, | ||
page_size=int(page_size), | ||
page_token=page_token, | ||
region=self.region_id, | ||
) | ||
|
||
# Make the request | ||
page_result = await client.list_clusters(request=request) | ||
clusters_list = [] | ||
|
||
# Handle the response | ||
async for response in page_result: | ||
clusters_list.append(json.loads(proto.Message.to_json(response))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're traversing the message, serializing it as a string representing a JSON object, and then parsing that string to get back a Python dictionary. All of the while, there is a corresponding method that just directly generates a dictionary without having to write to a string first. Further, you are taking these resulting dictionaries, which use integers for enum values, and manually converting those integers to the corresponding enum value names. However, I see that there is a keyword parameter on these methods that will use the enum value names to begin with if it is set to Please change all of the calls to |
||
|
||
return clusters_list | ||
except Exception as e: | ||
self.log.exception("Error fetching cluster list") | ||
self.log.exception(f"Error fetching cluster list") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Everywhere in this file where we log an exception, include the actual exception in the log message. |
||
return {"error": str(e)} | ||
|
||
async def list_runtime(self, page_size, page_token): | ||
async def get_cluster_detail(self, cluster): | ||
try: | ||
dataproc_url = await urls.gcp_service_url(DATAPROC_SERVICE_NAME) | ||
api_endpoint = f"{dataproc_url}/v1/projects/{self.project_id}/locations/{self.region_id}/sessionTemplates?pageSize={page_size}&pageToken={page_token}" | ||
async with self.client_session.get( | ||
api_endpoint, headers=self.create_headers() | ||
) as response: | ||
if response.status == 200: | ||
resp = await response.json() | ||
return resp | ||
else: | ||
return { | ||
"error": f"Failed to fetch runtimes: {response.status} {await response.text()}" | ||
} | ||
# Create a client | ||
client = dataproc.ClusterControllerAsyncClient( | ||
client_options={ | ||
"api_endpoint": f"us-central1-dataproc.googleapis.com:443" | ||
}, | ||
credentials=oauth2.Credentials(self._access_token), | ||
) | ||
|
||
# Initialize request argument(s) | ||
request = dataproc.GetClusterRequest( | ||
project_id=self.project_id, | ||
region=self.region_id, | ||
cluster_name=cluster, | ||
) | ||
|
||
# Make the request | ||
response = await client.get_cluster(request=request) | ||
|
||
# Handle the response | ||
return json.loads(proto.Message.to_json(response)) | ||
except Exception as e: | ||
self.log.exception(f"Error fetching runtime list: {str(e)}") | ||
self.log.exception(f"Error fetching cluster detail") | ||
return {"error": str(e)} | ||
|
||
async def stop_cluster(self, cluster): | ||
try: | ||
# Create a client | ||
client = dataproc.ClusterControllerAsyncClient( | ||
client_options={ | ||
"api_endpoint": f"us-central1-dataproc.googleapis.com:443" | ||
}, | ||
credentials=oauth2.Credentials(self._access_token), | ||
) | ||
|
||
# Initialize request argument(s) | ||
request = dataproc.StopClusterRequest( | ||
project_id=self.project_id, | ||
region=self.region_id, | ||
cluster_name=cluster, | ||
) | ||
|
||
operation = await client.stop_cluster(request=request) | ||
|
||
response = await operation.result() | ||
# Handle the response | ||
return json.loads(proto.Message.to_json(response)) | ||
except Exception as e: | ||
self.log.exception(f"Error fetching stop cluster") | ||
return {"error": str(e)} | ||
|
||
async def start_cluster(self, cluster): | ||
try: | ||
# Create a client | ||
client = dataproc.ClusterControllerAsyncClient( | ||
client_options={ | ||
"api_endpoint": f"us-central1-dataproc.googleapis.com:443" | ||
}, | ||
credentials=oauth2.Credentials(self._access_token), | ||
) | ||
|
||
# Initialize request argument(s) | ||
request = dataproc.StartClusterRequest( | ||
project_id=self.project_id, | ||
region=self.region_id, | ||
cluster_name=cluster, | ||
) | ||
|
||
operation = await client.start_cluster(request=request) | ||
|
||
response = await operation.result() | ||
# Handle the response | ||
return json.loads(proto.Message.to_json(response)) | ||
except Exception as e: | ||
self.log.exception(f"Error fetching start cluster") | ||
return {"error": str(e)} | ||
|
||
async def delete_cluster(self, cluster): | ||
try: | ||
# Create a client | ||
client = dataproc.ClusterControllerAsyncClient( | ||
client_options={ | ||
"api_endpoint": f"us-central1-dataproc.googleapis.com:443" | ||
}, | ||
credentials=oauth2.Credentials(self._access_token), | ||
) | ||
|
||
# Initialize request argument(s) | ||
request = dataproc.DeleteClusterRequest( | ||
project_id=self.project_id, | ||
region=self.region_id, | ||
cluster_name=cluster, | ||
) | ||
|
||
operation = await client.delete_cluster(request=request) | ||
|
||
response = await operation.result() | ||
# Handle the response | ||
if isinstance(response, Empty): | ||
return "Deleted successfully" | ||
else: | ||
return json.loads(proto.Message.to_json(response)) | ||
except Exception as e: | ||
self.log.exception(f"Error deleting cluster") | ||
return {"error": str(e)} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,8 @@ dependencies = [ | |
"pendulum>=3.0.0", | ||
"pydantic~=1.10.0", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why are we pinning the minor versions in these packages? I.E. why "~=.." instead of ">=.."? |
||
"bigframes~=0.22.0", | ||
"aiohttp~=3.9.5" | ||
"aiohttp~=3.9.5", | ||
"google-cloud-dataproc~=5.10.2" | ||
] | ||
dynamic = ["version", "description", "authors", "urls", "keywords"] | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Include the error in the log message