Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to query file on Kubernetes on AWS EKS, for remote-sql.rs example #1180

Open
Noah-FetchRewards opened this issue Feb 12, 2025 · 5 comments
Labels
bug Something isn't working

Comments

@Noah-FetchRewards
Copy link

Describe the bug
I'm deploying the ballista cluster on kuberentes on AWS EKS using the documentation/ yaml files at: https://datafusion.apache.org/ballista/user-guide/deployment/kubernetes.html

I'm trying to run the "remote-sql.rs" example to ensure it works, and I can't seem to get it working?
I uploaded the aggregate_test_100.csv file to the /mnt directory on the ballista scheduler, but I repeatedly get the error:

Error: ObjectStore(NotFound { path: "/mnt/aggregate_test_100.csv", source: Os { code: 2, kind: NotFound, message: "No such file or directory" } })

I can confirm the scheduler has the file loaded onto it because I can "sh" into the cluster and view the file with "ls".

Here is an example of the code:

`
#[tokio::main]
async fn main() -> Result<()> {

let config = SessionConfig::new_with_ballista()
    .with_target_partitions(4)
    .with_ballista_job_name("Remote SQL Example");

let state = SessionStateBuilder::new()
    .with_config(config)
    .with_default_features()
    .build();

let ctx = SessionContext::remote_with_state("df://external_ip:50050", state).await?;

ctx.register_csv("test", "/mnt/aggregate_test_100.csv", CsvReadOptions::new()).await?;

let df = ctx
    .sql(
        "SELECT c1, MIN(c12), MAX(c12)
         FROM test
         WHERE c11 > 0.1 AND c11 < 0.9
         GROUP BY c1",
    )
    .await?;

// 7) Print the query results
df.show().await?;

Ok(())

}`

I've also tried using the original example, where it references the file locally, which obviously didn't work.

ctx.register_csv( "test", &format!("{test_data}/aggregate_test_100.csv"), CsvReadOptions::new(), ) .await?;

What I really want to do, is have it reference a file in a s3 bucket, so I initially tried:

#[tokio::main]
async fn main() -> Result<()> {
    let s3_store = object_store::aws::AmazonS3Builder::new()
        .with_bucket_name("ballista-noah-2")
        .with_access_key_id("my key id")
        .with_secret_access_key("my key")
        .with_token("my token")
        .with_region("us-east-1") 
        .build()?;


    let runtime_env = RuntimeEnvBuilder::new()
        .build()?;

    let s3_url = Url::parse("s3://ballista-noah-2")
        .map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
    runtime_env.register_object_store(&s3_url, Arc::new(s3_store));

    let session_config = SessionConfig::new_with_ballista()
        .with_target_partitions(4)
        .with_ballista_job_name("Remote SQL Example");

    let state = SessionStateBuilder::new()
        .with_config(session_config)
        .with_runtime_env(Arc::new(runtime_env))
        .with_default_features()
        .build();

    let ctx = SessionContext::remote_with_state("df://127.0.0.1:50050", state).await?;

    ctx.register_csv(
        "test",
        "s3://ballista-noah-2/aggregate_test_100.csv",
        CsvReadOptions::new(),
    )
    .await?;

    let df = ctx
        .sql(
            "SELECT c1, MIN(c12), MAX(c12)
             FROM test
             WHERE c11 > 0.1 AND c11 < 0.9
             GROUP BY c1",
        )
        .await?;

    df.show().await?;

    Ok(())
}

This results in the error:

Error: ArrowError(ExternalError(Execution("Job LyVMWvI failed: Error planning job LyVMWvI: DataFusionError(Internal("No suitable object store found for s3://ballista-noah-2/aggregate_test_100.csv. See RuntimeEnv::register_object_store"))")), None)

I've been trying many different variations here.
I'm definitely doing something wrong, and I'm hoping someone can point me in the right direction

@Noah-FetchRewards Noah-FetchRewards added the bug Something isn't working label Feb 12, 2025
@milenkovicm
Copy link
Contributor

I can confirm the scheduler has the file loaded onto it because I can "sh" into the cluster and view the file with "ls".

client needs to list the files and executor needs to read the file, so having it just on scheduler will make no good.

Ballista comes with file object store by default, no other stores are build in. Should you wish to use object store you have to register it. Please have a look at examples/examples/custom-client.rs, examples/examples/custom-scheduler.rs and examples/examples/custom-executor.rs how to setup your own object store.

@Noah-FetchRewards
Copy link
Author

What do you mean by the client needs to list the files? I'm running the rust code locally, so I'm assuming that's the client in question.

Does the file need to be uploaded to the mnt/ directory of each executor?

@milenkovicm
Copy link
Contributor

Client need access to files during logical planning phase in order to setup appropriate table scan

@Noah-FetchRewards
Copy link
Author

Sorry @milenkovicm , I am unsure what you mean. My "client" is my local computer running the example ballista code from the repo at https://github.com/apache/datafusion-ballista/blob/main/examples/examples/remote-sql.rs.

I have the files locally available as well, so I am unsure what that means.

Do I need to have the data loaded onto the /mnt of all executor workers as well? I feel like that's not what you mean.

@milenkovicm
Copy link
Contributor

@Noah-FetchRewards executors are the one which process the data, so yes they need to have access to data. Client, the one running at your local computer, in default configuration need access to data in order to get the logical plan ready

hope that helps

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants