Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Python Models #32

Open
WillemLiang opened this issue Dec 30, 2024 · 2 comments
Open

Support Python Models #32

WillemLiang opened this issue Dec 30, 2024 · 2 comments

Comments

@WillemLiang
Copy link

WillemLiang commented Dec 30, 2024

Is there anyone who succeed run Python models with this adapter?

I noticed in the macros file table.sql, there's a build process that calls create_table_as.sql, which throws an exception saying "Python models are not supported in Fabric Spark" if the target is a Python model.

Does Fabric Spark now support Python models? I need to migrate my project from CDH to Fabric and want to replace Java Hive UDFs with Python ones with minimal changes to the existing code. Is there a way to create these UDFs and call them in SparkSQL?

Any advice would be appreciated.

@WillemLiang
Copy link
Author

Update the new findings:

For now, there is still a long way to enable Python models. There's too much work that needs to be done:

  1. Needs a spark connection to submit Python code. Currently, the LivyConnection is a mock-up pyodbc connection which allows only SQL statements, for Python codes should post a spark request (post JSON data {"kind":"pyspark"}).
  2. Implement the customer submission_methods & python_submission_helpers. As the class dbt.adapter.base.BaseAdapter requires, just like the codes in python_submissions.py functional for the adapter dbt-spark, which currently supports Databricks Python models.
  3. Model-level configs. See also databricks-configs.

Let me clarify my use case and environment before diving deep into source codes:

Use Cases

  • SQL models: Migrate Hive SQL to Fabric Lakehouse with minimum changes.
  • Spark models: Migrate the models that have Hive UDFs or Pyspark RDD APIs to Fabric Lakehouse with the original coding styles/patterns (keep As-Is).

Environment

  • OS: Darwin Kernel Version 23.6.0: Mon Jul 29 21:13:00 PDT 2024; root:xnu-10063.141.2~1/RELEASE_X86_64 x86_64
  • Python: Python 3.12.7
  • dbt:
    dbt-core==1.9.1
    dbt-fabric==1.9.0
    dbt-spark==1.9.0
    dbt-fabricspark-custom==1.9.1 (customized based on current version 1.7.2rc2, replace the legacy dbt-core APIs)

Current Behavior

ERROR caused: default_python_submission_method is not specified.

Expected Behavior

Implement a customer submission_methods & python_submission_helpers and make a spark-type livy connection to enable submitting Python jobs.

Steps To Reproduce

  1. Upgrade the legacy dbt-core APIs to align with dbt-fabric for Fabric DWH project. Reference this issue comment.
  2. Build Python models by modifying the materialization macros: table.sql.

Original code:

  -- build model
  {%- call statement('main', language=language) -%}
      {{ create_table_as(False, target_relation, compiled_code, language) }}
  {%- endcall -%}

Apparently, the code should be as such:

  -- build model
  {%- call statement('main', language=language) -%}
    {%- if language == 'sql' -%}
      {{ create_table_as(False, target_relation, compiled_code, language) }}
    {%- elif language == 'python' -%}
      {{ py_write_table(compiled_code, target_relation) }}
    {% endif %}
  {%- endcall -%}
  1. Configure submission_method in your model schema (schema.yml), which is not implemented yet, just for detailed logging.
models:
  - name: <your_python_model_name>
    config:
      submission_method: workflow_job
  1. Add a Python model following dbt docs.
  2. Run dbt run -s <your_python_model_name>.

Relevant Log Output

�15:51:47.412436 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'start', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10978ee70>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10980cf50>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10980d490>]}


============================== 15:51:47.420084 | 83e965fe-c298-419e-8e02-73302a13b50e ==============================
�15:51:47.420084 [info ] [MainThread]: Running with dbt=1.9.1
�15:51:47.420941 [debug] [MainThread]: running dbt with arguments {'printer_width': '80', 'indirect_selection': 'eager', 'log_cache_events': 'False', 'write_json': 'True', 'partial_parse': 'True', 'cache_selected_only': 'False', 'warn_error': 'None', 'debug': 'False', 'log_file_timezone': 'Asia/Shanghai', 'log_path': '/Users/g2/Dev/dc-lakehouse/projects/covid_data/logs', 'version_check': 'True', 'fail_fast': 'False', 'profiles_dir': '/Users/g2/Dev/dc-lakehouse/projects', 'log_stdout_timezone': 'Asia/Shanghai', 'use_colors': 'True', 'use_experimental_parser': 'False', 'empty': 'False', 'quiet': 'False', 'no_print': 'None', 'warn_error_options': 'WarnErrorOptions(include=[], exclude=[])', 'invocation_command': 'dbt run -s dwd_prop_city_dist', 'static_parser': 'True', 'log_format': 'default', 'target_path': 'None', 'introspect': 'True', 'send_anonymous_usage_stats': 'True'}
�15:51:47.743514 [debug] [MainThread]: Microsoft Fabric-Spark adapter: Setting fabricspark.connector to DEBUG
�15:51:47.744350 [debug] [MainThread]: Microsoft Fabric-Spark adapter: Setting botocore to DEBUG
�15:51:47.745051 [debug] [MainThread]: Microsoft Fabric-Spark adapter: Setting boto3 to DEBUG
�15:51:47.745750 [debug] [MainThread]: Microsoft Fabric-Spark adapter: Setting Microsoft Fabric-Spark.connector to DEBUG
�15:51:47.982876 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'project_id', 'label': '83e965fe-c298-419e-8e02-73302a13b50e', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10978ee70>]}
�15:51:48.068333 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'adapter_info', 'label': '83e965fe-c298-419e-8e02-73302a13b50e', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x109545640>]}
�15:51:48.070706 [info ] [MainThread]: Registered adapter: fabricspark=1.9.1-rc1
�15:51:48.230914 [debug] [MainThread]: checksum: 5e8d1596cf4eae33c11286bbb248a722d21b9f00d8a7ced8137c642517055418, vars: {}, profile: , target: , version: 1.9.1
�15:51:48.415026 [debug] [MainThread]: Partial parsing enabled: 0 files deleted, 0 files added, 0 files changed.
�15:51:48.415528 [debug] [MainThread]: Partial parsing enabled, no changes found, skipping parsing
�15:51:48.453047 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'load_project', 'label': '83e965fe-c298-419e-8e02-73302a13b50e', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10a2034a0>]}
�15:51:48.529315 [debug] [MainThread]: Wrote artifact WritableManifest to /Users/g2/Dev/dc-lakehouse/projects/covid_data/target/manifest.json
�15:51:48.534126 [debug] [MainThread]: Wrote artifact SemanticManifest to /Users/g2/Dev/dc-lakehouse/projects/covid_data/target/semantic_manifest.json
�15:51:48.568434 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'resource_counts', 'label': '83e965fe-c298-419e-8e02-73302a13b50e', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x109a71820>]}
�15:51:48.569067 [info ] [MainThread]: Found 5 models, 473 macros
�15:51:48.569652 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '83e965fe-c298-419e-8e02-73302a13b50e', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10843a6c0>]}
�15:51:48.571529 [info ] [MainThread]: 
�15:51:48.572017 [info ] [MainThread]: Concurrency: 1 threads (target='fabricspark-dev')
�15:51:48.572435 [info ] [MainThread]: 
�15:51:48.573602 [debug] [MainThread]: Acquiring new fabricspark connection 'master'
�15:51:48.574941 [debug] [ThreadPool]: Acquiring new fabricspark connection 'list_schemas'
�15:51:48.589496 [debug] [ThreadPool]: Using fabricspark connection "list_schemas"
�15:51:48.590178 [debug] [ThreadPool]: On list_schemas: /* {"app": "dbt", "dbt_version": "1.9.1", "profile_name": "covid_data", "target_name": "fabricspark-dev", "connection_name": "list_schemas"} */

    show databases
  
�15:51:48.590775 [debug] [ThreadPool]: Opening a new connection, currently in state init
�15:51:48.591270 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: Using CLI auth
�15:51:49.453686 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: CLI - Fetched Access Token
�15:51:49.454740 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: accessToken:eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6InoxcnNZSEhKOS04bWdndDRIc1p1OEJLa0JQdyIsImtpZCI6InoxcnNZSEhKOS04bWdndDRIc1p1OEJLa0JQdyJ9...
�15:51:49.455570 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: accessToken:eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiIsIng1dCI6InoxcnNZSEhKOS04bWdndDRIc1p1OEJLa0JQdyIsImtpZCI6InoxcnNZSEhKOS04bWdndDRIc1p1OEJLa0JQdyJ9...
�15:54:20.964824 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: New livy session id is: <your-livy-session-id>, {'livyInfo': {'startingAt': '2024-12-31T07:53:09.713199+00:00', 'idleAt': '2024-12-31T07:53:42.788348+00:00', 'currentState': 'idle', 'jobCreationRequest': {'name': 'dl_playground_<your-livy-session-id>', 'conf': {'spark.livy.synapse.sql.displayFormatter.enabled': 'true', 'spark.dynamicAllocation.enabled': 'true', 'spark.dynamicAllocation.minExecutors': '1', 'spark.dynamicAllocation.maxExecutors': '3', 'spark.gluten.enabled': 'true', 'spark.shuffle.manager': 'org.apache.spark.shuffle.sort.ColumnarShuffleManager', 'spark.native.enabled': 'false', 'spark.hadoop.fs.defaultFS': 'abfss://<your-workspace-id>@onelake.dfs.fabric.microsoft.com/', 'spark.hadoop.fs.homeDir': '/<your-lakehouse-id>', 'spark.sql.externalCatalogImplementation': 'com.microsoft.fabric.spark.catalog.InstrumentedExternalCatalog', 'spark.livy.synapse.session-warmup.enabled': 'false'}, 'driverMemory': '56g', 'driverCores': 8, 'executorMemory': '28g', 'executorCores': 4, 'numExecutors': 1}}, 'name': 'dl_playground_<your-livy-session-id>', 'id': '<your-livy-session-id>', 'appId': 'application_1735631586562_0001', 'appInfo': {'sparkUiUrl': 'http://vm-2bd75722:8088/proxy/application_1735631586562_0001/', 'impulseErrorCode': None, 'isStreamingQueryExists': None, 'impulseClassification': None, 'impulseTsg': None, 'isSessionTimedOut': 'false', 'driverLogUrl': 'http://vm-2bd75722:8042/node/containerlogs/container_1735631586562_0001_01_000001/trusted-service-user'}, 'artifactId': '<your-lakehouse-id>', 'jobType': 'SparkSession', 'submitterId': 'e7b84249-ee0d-4dd4-a86f-9d1499667dad', 'submitterName': '[email protected]', 'log': ['stdout: ', 'Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/mnt/tmp', 'Picked up _JAVA_OPTIONS: -Djava.io.tmpdir=/mnt/tmp', 'Warning: Ignoring non-Spark config property: livy.rsc.synapse.error-classification.enabled', '\nstderr: ', '\nYARN Diagnostics: '], 'pluginInfo': {'preparationStartedAt': '2024-12-31T07:52:03.3523598+00:00', 'resourceAcquisitionStartedAt': '2024-12-31T07:52:05.3829394+00:00', 'submissionStartedAt': '2024-12-31T07:53:07.7606702+00:00', 'monitoringStartedAt': '2024-12-31T07:53:11.7452416+00:00', 'currentState': 'Monitoring'}, 'schedulerInfo': {'submittedAt': '2024-12-31T07:51:55.7356631+00:00', 'queuedAt': '2024-12-31T07:51:56.1887112+00:00', 'scheduledAt': '2024-12-31T07:52:01.319414+00:00', 'currentState': 'Scheduled'}, 'state': 'idle', 'tags': {}, 'result': 'Uncertain'}
�15:54:20.972781 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: Submitted: {'code': 'show databases', 'kind': 'sql'} https://api.fabric.microsoft.com/v1/workspaces/<your-workspace-id>/lakehouses/<your-lakehouse-id>/livyapi/versions/2023-12-01/sessions/<your-livy-session-id>/statements
�15:54:34.735629 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: {'id': 1, 'code': 'show databases', 'state': 'available', 'output': {'status': 'ok', 'execution_count': 1, 'data': {'application/json': {'schema': {'type': 'struct', 'fields': [{'name': 'namespace', 'type': 'string', 'nullable': False, 'metadata': {}}]}, 'data': [['`BigData-DW-Demo.dl_playground.REV_CVAI_MODEL`'], ['`BigData-DW-Demo.dl_playground.REV_VAT_MODEL`'], ['`BigData-DW-Demo.dl_playground.TB_GPM_AS`'], ['`BigData-DW-Demo.dl_playground.crm`'], ['`BigData-DW-Demo.dl_playground.dbo`'], ['`BigData-DW-Demo.dl_playground.tb_gpm_common`'], ['`BigData-DW-Demo.dl_playground.uws`']]}}}}
�15:54:34.737913 [debug] [ThreadPool]: SQL status: OK in 166.150 seconds
�15:54:34.750692 [debug] [ThreadPool]: Re-using an available connection from the pool (formerly list_schemas, now create__dl_playground.dbo)
�15:54:34.752698 [debug] [ThreadPool]: Creating schema "schema: "dl_playground.dbo"
"
�15:54:34.768041 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: NotImplemented: add_begin_query
�15:54:34.768570 [debug] [ThreadPool]: Using fabricspark connection "create__dl_playground.dbo"
�15:54:34.769032 [debug] [ThreadPool]: On create__dl_playground.dbo: /* {"app": "dbt", "dbt_version": "1.9.1", "profile_name": "covid_data", "target_name": "fabricspark-dev", "connection_name": "create__dl_playground.dbo"} */

    select 1
  
�15:54:34.769505 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: Submitted: {'code': 'select 1', 'kind': 'sql'} https://api.fabric.microsoft.com/v1/workspaces/<your-workspace-id>/lakehouses/<your-lakehouse-id>/livyapi/versions/2023-12-01/sessions/<your-livy-session-id>/statements
�15:54:40.711614 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: {'id': 2, 'code': 'select 1', 'state': 'available', 'output': {'status': 'ok', 'execution_count': 2, 'data': {'application/json': {'schema': {'type': 'struct', 'fields': [{'name': '1', 'type': 'integer', 'nullable': False, 'metadata': {}}]}, 'data': [[1]]}}}}
�15:54:40.716901 [debug] [ThreadPool]: SQL status: OK in 5.940 seconds
�15:54:40.722274 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: NotImplemented: commit
�15:54:40.795503 [debug] [ThreadPool]: Re-using an available connection from the pool (formerly create__dl_playground.dbo, now list_None_dl_playground.dbo)
�15:54:40.807617 [debug] [ThreadPool]: Using fabricspark connection "list_None_dl_playground.dbo"
�15:54:40.808435 [debug] [ThreadPool]: On list_None_dl_playground.dbo: /* {"app": "dbt", "dbt_version": "1.9.1", "profile_name": "covid_data", "target_name": "fabricspark-dev", "connection_name": "list_None_dl_playground.dbo"} */
show table extended in dl_playground.dbo like '*'
  
�15:54:40.809246 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: Submitted: {'code': "show table extended in dl_playground.dbo like '*'", 'kind': 'sql'} https://api.fabric.microsoft.com/v1/workspaces/<your-workspace-id>/lakehouses/<your-lakehouse-id>/livyapi/versions/2023-12-01/sessions/<your-livy-session-id>/statements
�15:54:44.517800 [debug] [ThreadPool]: Microsoft Fabric-Spark adapter: {'id': 3, 'code': "show table extended in dl_playground.dbo like '*'", 'state': 'available', 'output': {'status': 'ok', 'execution_count': 3, 'data': {'application/json': {'schema': {'type': 'struct', 'fields': [{'name': 'namespace', 'type': 'string', 'nullable': False, 'metadata': {}}, {'name': 'tableName', 'type': 'string', 'nullable': False, 'metadata': {}}, {'name': 'isTemporary', 'type': 'boolean', 'nullable': False, 'metadata': {}}, {'name': 'information', 'type': 'string', 'nullable': False, 'metadata': {}}]}, 'data': [['89kmeh31ehgiqh2n5l26arbf4li6onrgdhgnipridtqmsp15chh6u', 'dwd_cust_parent_account', False, 'Catalog: spark_catalog\nDatabase: 89kmeh31ehgiqh2n5l26arbf4li6onrgdhgnipridtqmsp15chh6u\nTable: dwd_cust_parent_account\nCreated Time: Wed Jan 21 02:00:08 UTC 1970\nLast Access: UNKNOWN\nCreated By: Spark \nType: MANAGED\nProvider: delta\nComment: Delta table auto-discovered from Onelake: <your-lakehouse-id>/Tables/dbo/dwd_cust_parent_account/_delta_log\nTable Properties: [eTag=0x8DD259652342679, lastModifiedTime=2024-12-26T10:16:11Z, trident.autodiscovered.table=true, trident.autodiscovered.table.recorded=true, xCatalogMetadataVersion=202405]\nLocation: abfss://<your-workspace-id>@onelake.dfs.fabric.microsoft.com/<your-lakehouse-id>/Tables/dbo/dwd_cust_parent_account\n'], ['89kmeh31ehgiqh2n5l26arbf4li6onrgdhgnipridtqmsp15chh6u', 'dwd_cust_tenant', False, 'Catalog: spark_catalog\nDatabase: 89kmeh31ehgiqh2n5l26arbf4li6onrgdhgnipridtqmsp15chh6u\nTable: dwd_cust_tenant\nCreated Time: Wed Jan 21 02:00:08 UTC 1970\nLast Access: UNKNOWN\nCreated By: Spark \nType: MANAGED\nProvider: delta\nComment: Delta table auto-discovered from Onelake: <your-lakehouse-id>/Tables/dbo/dwd_cust_tenant/_delta_log\nTable Properties: [eTag=0x8DD25966B9EC345, lastModifiedTime=2024-12-26T10:16:53Z, trident.autodiscovered.table=true, trident.autodiscovered.table.recorded=true, xCatalogMetadataVersion=202405]\nLocation: abfss://<your-workspace-id>@onelake.dfs.fabric.microsoft.com/<your-lakehouse-id>/Tables/dbo/dwd_cust_tenant\n'], ['89kmeh31ehgiqh2n5l26arbf4li6onrgdhgnipridtqmsp15chh6u', 'stg_prop_city_dist', False, 'Catalog: spark_catalog\nDatabase: 89kmeh31ehgiqh2n5l26arbf4li6onrgdhgnipridtqmsp15chh6u\nTable: stg_prop_city_dist\nCreated Time: Wed Jan 21 02:05:52 UTC 1970\nLast Access: UNKNOWN\nCreated By: Spark \nType: MANAGED\nProvider: delta\nComment: Delta table auto-discovered from Onelake: <your-lakehouse-id>/Tables/dbo/stg_prop_city_dist/_delta_log\nTable Properties: [eTag=0x8DD28B6E01D16F1, lastModifiedTime=2024-12-30T09:46:46Z, trident.autodiscovered.table=true, trident.autodiscovered.table.recorded=true, xCatalogMetadataVersion=202405]\nLocation: abfss://<your-workspace-id>@onelake.dfs.fabric.microsoft.com/<your-lakehouse-id>/Tables/dbo/stg_prop_city_dist\n'], ['89kmeh31ehgiqh2n5l26arbf4li6onrgdhgnipridtqmsp15chh6u', 'tb_gpm_head', False, 'Catalog: spark_catalog\nDatabase: 89kmeh31ehgiqh2n5l26arbf4li6onrgdhgnipridtqmsp15chh6u\nTable: tb_gpm_head\nCreated Time: Wed Jan 21 01:47:02 UTC 1970\nLast Access: UNKNOWN\nCreated By: Spark \nType: MANAGED\nProvider: delta\nComment: Delta table auto-discovered from Onelake: <your-lakehouse-id>/Tables/dbo/tb_gpm_head/_delta_log\nTable Properties: [trident.autodiscovered.table=true]\nLocation: abfss://<your-workspace-id>@onelake.dfs.fabric.microsoft.com/<your-lakehouse-id>/Tables/dbo/tb_gpm_head\n']]}}}}
�15:54:44.519472 [debug] [ThreadPool]: SQL status: OK in 3.710 seconds
�15:54:44.530274 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'runnable_timing', 'label': '83e965fe-c298-419e-8e02-73302a13b50e', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x1092b3620>]}
�15:54:44.534610 [debug] [MainThread]: Microsoft Fabric-Spark adapter: NotImplemented: add_begin_query
�15:54:44.535568 [debug] [MainThread]: Microsoft Fabric-Spark adapter: NotImplemented: commit
�15:54:44.548409 [debug] [Thread-1 (]: Began running node model.covid_data.dwd_prop_city_dist
�15:54:44.549970 [info ] [Thread-1 (]: 1 of 1 START python table model dl_playground.dbo.dwd_prop_city_dist ........... [RUN]
�15:54:44.551195 [debug] [Thread-1 (]: Re-using an available connection from the pool (formerly list_None_dl_playground.dbo, now model.covid_data.dwd_prop_city_dist)
�15:54:44.552173 [debug] [Thread-1 (]: Began compiling node model.covid_data.dwd_prop_city_dist
�15:54:44.596511 [debug] [Thread-1 (]: Writing injected SQL for node "model.covid_data.dwd_prop_city_dist"
�15:54:44.599867 [debug] [Thread-1 (]: Began executing node model.covid_data.dwd_prop_city_dist
�15:54:44.631966 [debug] [Thread-1 (]: Using fabricspark connection "model.covid_data.dwd_prop_city_dist"
�15:54:44.632456 [debug] [Thread-1 (]: On model.covid_data.dwd_prop_city_dist: /* {"app": "dbt", "dbt_version": "1.9.1", "profile_name": "covid_data", "target_name": "fabricspark-dev", "node_id": "model.covid_data.dwd_prop_city_dist"} */
drop table if exists dl_playground.dbo.dwd_prop_city_dist
�15:54:44.632995 [debug] [Thread-1 (]: Microsoft Fabric-Spark adapter: Submitted: {'code': 'drop table if exists dl_playground.dbo.dwd_prop_city_dist', 'kind': 'sql'} https://api.fabric.microsoft.com/v1/workspaces/<your-workspace-id>/lakehouses/<your-lakehouse-id>/livyapi/versions/2023-12-01/sessions/<your-livy-session-id>/statements
�15:54:49.236758 [debug] [Thread-1 (]: Microsoft Fabric-Spark adapter: {'id': 4, 'code': 'drop table if exists dl_playground.dbo.dwd_prop_city_dist', 'state': 'available', 'output': {'status': 'ok', 'execution_count': 4, 'data': {'application/json': {'schema': {'type': 'struct', 'fields': []}, 'data': []}}}}
�15:54:49.238277 [debug] [Thread-1 (]: SQL status: OK in 4.610 seconds
�15:54:49.245130 [debug] [Thread-1 (]: Writing runtime python for node "model.covid_data.dwd_prop_city_dist"
�15:54:49.248144 [debug] [Thread-1 (]: On model.covid_data.dwd_prop_city_dist: 
import pandas as pd
from pyspark.sql import DataFrame
from pyspark.sql.types import ArrayType, StringType
from io import StringIO

def generate_data(sparkSession)->DataFrame:
    tree = '31:上海市#$01:市辖区#%01:黄浦区#|002:南京东路街道#|013:外滩街道#|015:半淞园路街道#|017:小东门街道#|018:豫园街道#|019:老西门街道#|020:五里桥街道#|021:打浦桥街道#|022:淮海中路街道#|023:瑞金二路街道#%04:徐汇区#|003:天平路街道#|004:湖南路街道#|007:斜土路街道#|008:枫林路街道#|010:长桥街道#|011:田林街道#|012:虹梅路街道#|013:康健新村街道#|014:徐家汇街道#|015:凌云路街道#|016:龙华街道#|017:漕河泾街道#|103:华泾镇#|501:漕河泾新兴技术开发区#%05:长宁区#|001:华阳路街道#|002:江苏路街道#|004:新华路街道#|005:周家桥街道#|006:天山路街道#|008:仙霞新村街道#|009:虹桥街道#|010:程家桥街道#|011:北新泾街道#|102:新泾镇#%06:静安区#|006:江宁路街道#|011:石门二路街道#|012:南京西路街道#|013:静安寺街道#|014:曹家渡街道#|015:天目西路街道#|016:北站街道#|017:宝山路街道#|018:共和新路街道#|019:大宁路街道#|020:彭浦新村街道#|021:临汾路街道#|022:芷江西路街道#|101:彭浦镇#%07:普陀区#|005:曹杨新村街道#|014:长风新村街道#|015:长寿路街道#|016:甘泉路街道#|017:石泉路街道#|020:宜川路街道#|021:万里街道#|022:真如镇街道#|102:长征镇#|103:桃浦镇#%09:虹口区#|009:欧阳路街道#|010:曲阳路街道#|011:广中路街道#|014:嘉兴路街道#|016:凉城新村街道#|017:四川北路街道#|018:北外滩街道#|019:江湾镇街道#%10:杨浦区#|001:定海路街道#|006:平凉路街道#|008:江浦路街道#|009:四平路街道#|012:控江路街道#|013:长白新村街道#|015:延吉新村街道#|016:殷行街道#|018:大桥街道#|019:五角场街道#|020:新江湾城街道#|021:长海路街道#%12:闵行区#|001:江川路街道#|006:古美街道#|008:新虹街道#|009:浦锦街道#|101:莘庄镇#|102:七宝镇#|103:颛桥镇#|106:华漕镇#|107:虹桥镇#|108:梅陇镇#|110:吴泾镇#|112:马桥镇#|114:浦江镇#|501:莘庄工业区#%13:宝山区#|003:友谊路街道#|007:吴淞街道#|008:张庙街道#|101:罗店镇#|102:大场镇#|103:杨行镇#|104:月浦镇#|106:罗泾镇#|109:顾村镇#|111:高境镇#|112:庙行镇#|113:淞南镇#|501:宝山工业园区#%14:嘉定区#|001:新成路街道#|002:真新街道#|004:嘉定镇街道#|102:南翔镇#|103:安亭镇#|106:马陆镇#|109:徐行镇#|111:华亭镇#|114:外冈镇#|118:江桥镇#|401:菊园新区#|501:嘉定工业区#%15:浦东新区#|004:潍坊新村街道#|005:陆家嘴街道#|007:周家渡街道#|008:塘桥街道#|009:上钢新村街道#|010:南码头路街道#|011:沪东新村街道#|012:金杨新村街道#|013:洋泾街道#|014:浦兴路街道#|015:东明路街道#|016:花木街道#|103:川沙新镇#|104:高桥镇#|105:北蔡镇#|110:合庆镇#|114:唐镇#|117:曹路镇#|120:金桥镇#|121:高行镇#|123:高东镇#|125:张江镇#|130:三林镇#|131:惠南镇#|132:周浦镇#|133:新场镇#|134:大团镇#|136:康桥镇#|137:航头镇#|139:祝桥镇#|140:泥城镇#|141:宣桥镇#|142:书院镇#|143:万祥镇#|144:老港镇#|145:南汇新城镇#|401:芦潮港农场#|402:东海农场#|403:朝阳农场#|501:中国(上海)自由贸易试验区(保税片区)#|502:金桥经济技术开发区#|503:张江高科技园区#%16:金山区#|001:石化街道#|101:朱泾镇#|102:枫泾镇#|103:张堰镇#|104:亭林镇#|105:吕巷镇#|107:廊下镇#|109:金山卫镇#|112:漕泾镇#|113:山阳镇#|503:上海湾区高新技术产业开发区#%17:松江区#|001:岳阳街道#|002:永丰街道#|003:方松街道#|004:中山街道#|005:广富林街道#|006:九里亭街道#|102:泗泾镇#|103:佘山镇#|104:车墩镇#|105:新桥镇#|106:洞泾镇#|107:九亭镇#|109:泖港镇#|116:石湖荡镇#|117:新浜镇#|120:叶榭镇#|121:小昆山镇#|501:松江工业区#|504:佘山度假区#|507:上海松江出口加工区#%18:青浦区#|001:夏阳街道#|002:盈浦街道#|003:香花桥街道#|102:朱家角镇#|103:练塘镇#|104:金泽镇#|105:赵巷镇#|106:徐泾镇#|107:华新镇#|109:重固镇#|110:白鹤镇#%20:奉贤区#|001:西渡街道#|002:奉浦街道#|003:金海街道#|101:南桥镇#|102:奉城镇#|104:庄行镇#|106:金汇镇#|109:四团镇#|111:青村镇#|118:柘林镇#|123:海湾镇#|503:海湾旅游区#%51:崇明区#|101:城桥镇#|102:堡镇#|103:新河镇#|104:庙镇#|105:竖新镇#|106:向化镇#|107:三星镇#|108:港沿镇#|109:中兴镇#|110:陈家镇#|111:绿华镇#|112:港西镇#|113:建设镇#|114:新海镇#|115:东平镇#|116:长兴镇#|201:新村乡#|202:横沙乡#|401:前卫农场#|402:东平林场#|501:上实现代农业园区'
    data=['code,name,parent']
    p=['']*4
    for node in tree.split('#'):
        i = '$%|'.find(node[0])+1
        [k,v] = node.strip('$%|').split(':')
        p[i]=k
        data.append(''.join(p[:i+1])+f",{v},"+''.join(p[:i]))
    
    csv_content = '\n'.join(data)
    pandas_df = pd.read_csv(StringIO(csv_content),dtype=str)
    spark_df = sparkSession.createDataFrame(pandas_df)
    return spark_df

def find_hierarchy_path(linked_list:ArrayType, target_node:StringType)->ArrayType(StringType()):
    result = []
    visited = set()
    next_node = target_node

    map_dict = {}
    for item in linked_list:
        map_dict.update(item)

    while next_node in map_dict and next_node not in visited:
        result.append(next_node)
        visited.add(next_node)
        next_node = map_dict.get(next_node)

    return result

def reverse_array(input)->ArrayType(StringType()):
    return input[::-1]

def model(dbt, session):
    # my_sql_model_df = dbt.ref("my_sql_model")

    spark_df = generate_data(session)
    spark_df.show()

    stg_table="stg_prop_city_dist"

    lakehouseName="dl_playground"
    # stgSinkTableName = f"{lakehouseName}.dbo.{stg_table}"
    # print(stgSinkTableName)
    # spark_df.write.mode("overwrite").format("delta").saveAsTable(stgSinkTableName)

    spark_df.createOrReplaceTempView(stg_table)

    session.udf.register("find_hierarchy_path", find_hierarchy_path, ArrayType(StringType()))
    session.udf.register("reverse_array", reverse_array, ArrayType(StringType()))

    sql_stmt = f"""
    select
        code
        ,name
        ,parent
        ,regexp_replace(concat_ws(' > ', reverse_array(find_hierarchy_path(nodes, code_name))),'[0-9]*::', '') as hierarchy_path
    from (
        select
            cur.code
            ,cur.name
            ,concat(cur.code, '::', cur.name) as code_name
            ,cur.parent
            ,collect_list(
                    map(
                        concat(cur.code, '::', cur.name)
                        ,case when parent.code is null or cur.parent is null or lower(trim(cur.parent)) = 'null'
                                then concat(cur.code, '::', cur.name)
                                else concat(parent.code, '::', parent.name)
                        end)
                    ) OVER ()                 as nodes
        from {stg_table} as cur
        left join {stg_table} as parent
            on cur.parent = parent.code
        ) tbl
    """

    final_df = session.sql(sql_stmt)
    # session.catalog.dropTable(stgSinkTableName, True, False)
    return final_df


# This part is user provided model code
# you will need to copy the next section to run the code
# COMMAND ----------
# this part is dbt logic for get ref work, do not modify

def ref(*args, **kwargs):
    refs = {}
    key = '.'.join(args)
    version = kwargs.get("v") or kwargs.get("version")
    if version:
        key += f".v{version}"
    dbt_load_df_function = kwargs.get("dbt_load_df_function")
    return dbt_load_df_function(refs[key])


def source(*args, dbt_load_df_function):
    sources = {}
    key = '.'.join(args)
    return dbt_load_df_function(sources[key])


config_dict = {}


class config:
    def __init__(self, *args, **kwargs):
        pass

    @staticmethod
    def get(key, default=None):
        return config_dict.get(key, default)

class this:
    """dbt.this() or dbt.this.identifier"""
    database = "None"
    schema = "dl_playground.dbo"
    identifier = "dwd_prop_city_dist"
    
    def __repr__(self):
        return 'dl_playground.dbo.dwd_prop_city_dist'


class dbtObj:
    def __init__(self, load_df_function) -> None:
        self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function)
        self.ref = lambda *args, **kwargs: ref(*args, **kwargs, dbt_load_df_function=load_df_function)
        self.config = config
        self.this = this()
        self.is_incremental = False

# COMMAND ----------

# how to execute python model in notebook
# dbt = dbtObj(spark.table)
# df = model(dbt, spark)


# --- Autogenerated dbt materialization code. --- #
dbt = dbtObj(spark.table)
df = model(dbt, spark)

# make sure pyspark exists in the namepace, for 7.3.x-scala2.12 it does not exist
import pyspark
# make sure pandas exists before using it
try:
  import pandas
  pandas_available = True
except ImportError:
  pandas_available = False

# make sure pyspark.pandas exists before using it
try:
  import pyspark.pandas
  pyspark_pandas_api_available = True
except ImportError:
  pyspark_pandas_api_available = False

# make sure databricks.koalas exists before using it
try:
  import databricks.koalas
  koalas_available = True
except ImportError:
  koalas_available = False

# preferentially convert pandas DataFrames to pandas-on-Spark or Koalas DataFrames first
# since they know how to convert pandas DataFrames better than `spark.createDataFrame(df)`
# and converting from pandas-on-Spark to Spark DataFrame has no overhead
if pyspark_pandas_api_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
  df = pyspark.pandas.frame.DataFrame(df)
elif koalas_available and pandas_available and isinstance(df, pandas.core.frame.DataFrame):
  df = databricks.koalas.frame.DataFrame(df)

# convert to pyspark.sql.dataframe.DataFrame
if isinstance(df, pyspark.sql.dataframe.DataFrame):
  pass  # since it is already a Spark DataFrame
elif pyspark_pandas_api_available and isinstance(df, pyspark.pandas.frame.DataFrame):
  df = df.to_spark()
elif koalas_available and isinstance(df, databricks.koalas.frame.DataFrame):
  df = df.to_spark()
elif pandas_available and isinstance(df, pandas.core.frame.DataFrame):
  df = spark.createDataFrame(df)
else:
  msg = f"{type(df)} is not a supported type for dbt Python materialization"
  raise Exception(msg)

df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("dl_playground.dbo.dwd_prop_city_dist")
    
�15:54:49.250734 [error] [Thread-1 (]: �Unhandled error while executing target/run/covid_data/models/dwd/prop/dwd_prop_city_dist.py�
ERROR: default_python_submission_method is not specified
�15:54:49.266814 [debug] [Thread-1 (]: Traceback (most recent call last):
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt/task/base.py", line 361, in safe_run
    result = self.compile_and_execute(manifest, ctx)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt/task/base.py", line 308, in compile_and_execute
    result = self.run(ctx.node, manifest)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt/task/base.py", line 408, in run
    return self.execute(compiled_node, manifest)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt/task/run.py", line 332, in execute
    return self._execute_model(hook_ctx, context_config, model, context, materialization_macro)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt/task/run.py", line 292, in _execute_model
    result = MacroGenerator(
             ^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt/clients/jinja.py", line 82, in __call__
    return self.call_macro(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt_common/clients/jinja.py", line 323, in call_macro
    return macro(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/jinja2/runtime.py", line 770, in __call__
    return self._invoke(arguments, autoescape)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/jinja2/runtime.py", line 784, in _invoke
    rv = self._func(*arguments)
         ^^^^^^^^^^^^^^^^^^^^^^
  File "<template>", line 124, in macro
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/jinja2/sandbox.py", line 401, in call
    return __context.call(__obj, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/jinja2/runtime.py", line 303, in call
    return __obj(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt/clients/jinja.py", line 82, in __call__
    return self.call_macro(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt_common/clients/jinja.py", line 323, in call_macro
    return macro(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/jinja2/runtime.py", line 770, in __call__
    return self._invoke(arguments, autoescape)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/jinja2/runtime.py", line 784, in _invoke
    rv = self._func(*arguments)
         ^^^^^^^^^^^^^^^^^^^^^^
  File "<template>", line 55, in macro
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/jinja2/sandbox.py", line 401, in call
    return __context.call(__obj, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/jinja2/runtime.py", line 303, in call
    return __obj(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt/context/providers.py", line 1535, in submit_python_job
    return self.adapter.submit_python_job(parsed_model, compiled_code)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt/adapters/base/impl.py", line 185, in execution_with_log
    response = code_execution_function(*args)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt/adapters/base/impl.py", line 1592, in submit_python_job
    "submission_method", self.default_python_submission_method
                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/g2/Dev/dc-lakehouse/.dbt-fabric/lib/python3.12/site-packages/dbt/adapters/base/impl.py", line 1584, in default_python_submission_method
    raise NotImplementedError("default_python_submission_method is not specified")
dbt_common.exceptions.base.NotImplementedError: ERROR: default_python_submission_method is not specified

�15:54:49.313431 [debug] [Thread-1 (]: Spark adapter: Setting pyhive.hive logging to ERROR
�15:54:49.314772 [debug] [Thread-1 (]: Spark adapter: Setting thrift.transport logging to ERROR
�15:54:49.315322 [debug] [Thread-1 (]: Spark adapter: Setting thrift.protocol logging to ERROR
�15:54:49.321542 [debug] [Thread-1 (]: Sending event: {'category': 'dbt', 'action': 'run_model', 'label': '83e965fe-c298-419e-8e02-73302a13b50e', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x108c21f40>]}
�15:54:49.322611 [error] [Thread-1 (]: 1 of 1 ERROR creating python table model dl_playground.dbo.dwd_prop_city_dist .. [�ERROR� in 4.72s]
�15:54:49.323633 [debug] [Thread-1 (]: Finished running node model.covid_data.dwd_prop_city_dist
�15:54:49.324434 [debug] [Thread-4 (]: Marking all children of 'model.covid_data.dwd_prop_city_dist' to be skipped because of status 'error'.  Reason: ERROR: default_python_submission_method is not specified.
�15:54:49.327375 [debug] [MainThread]: On master: ROLLBACK
�15:54:49.327837 [debug] [MainThread]: Opening a new connection, currently in state init
�15:54:50.773071 [debug] [MainThread]: Microsoft Fabric-Spark adapter: Reusing session: <your-livy-session-id>
�15:54:50.774569 [debug] [MainThread]: Microsoft Fabric-Spark adapter: NotImplemented: rollback
�15:54:50.775703 [debug] [MainThread]: Microsoft Fabric-Spark adapter: NotImplemented: add_begin_query
�15:54:50.776543 [debug] [MainThread]: Microsoft Fabric-Spark adapter: NotImplemented: commit
�15:54:53.584773 [debug] [MainThread]: Microsoft Fabric-Spark adapter: Closing the livy session: <your-livy-session-id>
�15:54:56.384373 [debug] [MainThread]: Microsoft Fabric-Spark adapter: Closed the livy session: <your-livy-session-id>
�15:54:57.831837 [debug] [MainThread]: Microsoft Fabric-Spark adapter: Closing the livy session: <your-livy-session-id>
�15:55:00.567005 [debug] [MainThread]: Microsoft Fabric-Spark adapter: Closed the livy session: <your-livy-session-id>
�15:55:00.568020 [info ] [MainThread]: 
�15:55:00.568734 [info ] [MainThread]: Finished running 1 table model in 0 hours 3 minutes and 11.99 seconds (191.99s).
�15:55:00.570720 [debug] [MainThread]: Command end result
�15:55:00.599213 [debug] [MainThread]: Wrote artifact WritableManifest to /Users/g2/Dev/dc-lakehouse/projects/covid_data/target/manifest.json
�15:55:00.604055 [debug] [MainThread]: Wrote artifact SemanticManifest to /Users/g2/Dev/dc-lakehouse/projects/covid_data/target/semantic_manifest.json
�15:55:00.613047 [debug] [MainThread]: Wrote artifact RunExecutionResult to /Users/g2/Dev/dc-lakehouse/projects/covid_data/target/run_results.json
�15:55:00.613496 [info ] [MainThread]: 
�15:55:00.614264 [info ] [MainThread]: �Completed with 1 error, 0 partial successes, and 0 warnings:�
�15:55:00.614720 [info ] [MainThread]: 
�15:55:00.615539 [error] [MainThread]:   ERROR: default_python_submission_method is not specified
�15:55:00.616138 [info ] [MainThread]: 
�15:55:00.616653 [info ] [MainThread]: Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1
�15:55:00.623979 [debug] [MainThread]: Resource report: {"command_name": "run", "command_success": false, "command_wall_clock_time": 193.34938, "process_in_blocks": "0", "process_kernel_time": 0.820225, "process_mem_max_rss": "115167232", "process_out_blocks": "0", "process_user_time": 2.860301}
�15:55:00.624993 [debug] [MainThread]: Command `dbt run` failed at 07:55:00.624821 after 193.35 seconds
�15:55:00.625896 [debug] [MainThread]: Sending event: {'category': 'dbt', 'action': 'invocation', 'label': 'end', 'context': [<snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10980d490>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10980d250>, <snowplow_tracker.self_describing_json.SelfDescribingJson object at 0x10980d0a0>]}
�15:55:00.626457 [debug] [MainThread]: Flushing usage events
�15:55:01.456516 [debug] [MainThread]: An error was encountered while trying to flush usage events

@WillemLiang
Copy link
Author

I've worked out a quick fix for experimental attempts. Rather than maintaining a dedicated livy session for Pyspark submission, reusing the existing SQL type livy session and adding an option for workloads' job type seems more reasonable.

  1. Add a Livy output at the end of your Python script. The Livy magic command %json result will return a JSON string for your particular model data. The compiled code looks like this:
# df.write.mode("overwrite").format("delta").option("overwriteSchema", "true").saveAsTable("dl_playground.dbo.dwd_prop_city_dist")
# Add json output after the dbt model materialization, i.e. write to a lakehouse table
result = {
    'data': [list(row) for row in df.collect()],
    'schema': df.schema.jsonValue()
}
%json result

You can modify the macros file tables.sql as below:

{% macro py_write_table(compiled_code, target_relation) %}
# ----the origin codes----
df.write.mode("overwrite").format("{{ config.get('file_format', 'delta') }}").option("overwriteSchema", "true").saveAsTable("{{ target_relation }}")
# add livy json output
result = {
    'data': [list(row) for row in df.collect()],
    'schema': df.schema.jsonValue()
}
%json result
{%- endmacro -%}
  1. Add the executePython function in livysession.py for the test. It can modify the original _submitLivyCode() and execute() methods, but I am still working on testing and data validations.
import textwrap

Class LivyCursor:

    def _submitPythonLivyCode(self, code) -> Response:
        if self.livy_session.is_new_session_required:
            LivySessionManager.connect(self.credential)
            self.session_id = self.livy_session.session_id

        code2 = code.replace("'{LIVY_SESSION_ID}'", f"'{self.session_id}'")    # add livy session id for testing
        # Submit code
        data = {"code": code2, "kind": "pyspark"}
        logger.debug(
            f"Submitted: {data} {self.connect_url + '/sessions/' + self.session_id + '/statements'}"
        )

        res = requests.post(
            self.connect_url + "/sessions/" + self.session_id + "/statements",
            data=json.dumps(data),
            headers=get_headers(self.credential, False),
        )
        return res

    def executePython(self, python: str, *parameters: Any) -> None:
        
        res = self._getLivyResult(self._submitPythonLivyCode(textwrap.dedent(python)))
        logger.debug(res)
        
        if res["output"]["status"] == "ok":
            values = res["output"]["data"]["application/json"]
            if len(values) >= 1:
                self._rows = values["data"]
                self._schema = values["schema"]["fields"]
            else:
                self._rows = []
                self._schema = []
        else:
            self._rows = None
            self._schema = None

            raise dbt.exceptions.DbtDatabaseError(
                "Error while executing query: " + res["output"]["evalue"]
            )

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant