Skip to content

Commit

Permalink
Code review
Browse files Browse the repository at this point in the history
  • Loading branch information
garronej committed Jan 20, 2025
1 parent e2ac1a1 commit ef001af
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 246 deletions.
4 changes: 2 additions & 2 deletions web/src/core/adapters/s3Client/s3Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ export function createS3Client(
return downloadUrl;
},

getFileMetadata: async ({ path }) => {
getFileContentType: async ({ path }) => {
const { bucketName, objectName } = bucketNameAndObjectNameFromS3Path(path);

const { getAwsS3Client } = await prApi;
Expand All @@ -661,7 +661,7 @@ export function createS3Client(
})
);

return { contentType: head.ContentType, metadata: head.Metadata };
return head.ContentType;
}

// "getPresignedUploadUrl": async ({ path, validityDurationSecond }) => {
Expand Down
68 changes: 26 additions & 42 deletions web/src/core/adapters/sqlOlap/sqlOlap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { assert } from "tsafe/assert";
import memoize from "memoizee";
import { same } from "evt/tools/inDepth/same";
import type { ReturnType } from "tsafe";
import { arrowTableToColumns, arrowTableToRows } from "./utils/arrowTable";
import { createArrowTableApi } from "./utils/arrowTable";

export const createDuckDbSqlOlap = (params: {
getS3Config: () => Promise<
Expand All @@ -34,6 +34,8 @@ export const createDuckDbSqlOlap = (params: {
}): SqlOlap => {
const { getS3Config } = params;

const prArrowTableApi = createArrowTableApi();

const sqlOlap: SqlOlap = {
getConfiguredAsyncDuckDb: (() => {
let hasCustomExtensionRepositoryBeenSetup = false;
Expand Down Expand Up @@ -106,7 +108,7 @@ export const createDuckDbSqlOlap = (params: {
return db;
};
})(),
getRowsAndColumns: async ({ sourceUrl, fileType, rowsPerPage, page }) => {
getRows: async ({ sourceUrl, fileType, rowsPerPage, page }) => {
const db = await sqlOlap.getConfiguredAsyncDuckDb();
const conn = await db.connect();

Expand All @@ -124,57 +126,39 @@ export const createDuckDbSqlOlap = (params: {
const stmt = await conn.prepare(sqlQuery);
const res = await stmt.query();

const columns = await arrowTableToColumns({ table: res });
const { arrowTableToColumns, arrowTableToRows } = await prArrowTableApi;

const columns = arrowTableToColumns({ table: res });
const rows = arrowTableToRows({ table: res, columns });

await conn.close();

return { rows, columns };
},
getRows: async ({ sourceUrl, fileType, rowsPerPage, page, columns }) => {
const db = await sqlOlap.getConfiguredAsyncDuckDb();
const conn = await db.connect();

const sqlQuery = `SELECT * FROM ${(() => {
switch (fileType) {
case "csv":
return `read_csv('${sourceUrl}')`;
case "parquet":
return `read_parquet('${sourceUrl}')`;
case "json":
return `read_json('${sourceUrl}')`;
}
})()} LIMIT ${rowsPerPage} OFFSET ${rowsPerPage * (page - 1)}`;

const stmt = await conn.prepare(sqlQuery);
const res = await stmt.query();

const rows = arrowTableToRows({ table: res, columns });

await conn.close();
getRowCount: (() => {
const getRowCount_memo = memoize(
async (sourceUrl: string, fileType: "parquet" | "json" | "csv") => {
if (fileType !== "parquet") {
return undefined;
}

return { rows };
},
getRowCount: memoize(
async ({ sourceUrl, fileType }) => {
if (fileType !== "parquet") {
return undefined;
}
const db = await sqlOlap.getConfiguredAsyncDuckDb();

const db = await sqlOlap.getConfiguredAsyncDuckDb();
const conn = await db.connect();

const conn = await db.connect();
const query = `SELECT count(*)::INTEGER as v FROM read_parquet("${sourceUrl}");`;

const query = `SELECT count(*)::INTEGER as v FROM read_parquet("${sourceUrl}");`;
return conn
.prepare(query)
.then(stmt => stmt.query())
.then(res => res.toArray()[0]["v"])
.finally(() => conn.close());
},
{ promise: true, max: 1 }
);

return conn
.prepare(query)
.then(stmt => stmt.query())
.then(res => res.toArray()[0]["v"])
.finally(() => conn.close());
},
{ promise: true, max: 1 }
)
return ({ sourceUrl, fileType }) => getRowCount_memo(sourceUrl, fileType);
})()
};

return sqlOlap;
Expand Down
239 changes: 123 additions & 116 deletions web/src/core/adapters/sqlOlap/utils/arrowTable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,139 +2,146 @@ import type { Table, DataType, Vector } from "apache-arrow";
import { Column } from "core/ports/SqlOlap";
import { assert } from "tsafe/assert";

// Helper function to map Arrow DataType to a user-friendly Column.type
const getColumnType = async (type: DataType): Promise<Column["type"]> => {
export async function createArrowTableApi() {
const { Type, Int } = await import("apache-arrow");
switch (type.typeId) {
case Type.Int: {
assert(type instanceof Int);
if (type.bitWidth === 64) {
return "bigint";

// Helper function to map Arrow DataType to a user-friendly Column.type
const getColumnType = (type: DataType): Column["type"] => {
switch (type.typeId) {
case Type.Int: {
assert(type instanceof Int);
if (type.bitWidth === 64) {
return "bigint";
}
return "number";
}
return "number";
}

case Type.Decimal:
case Type.Float:
return "number";

case Type.Utf8:
case Type.LargeUtf8:
return "string";

case Type.Bool:
return "boolean";

case Type.Time:
case Type.Timestamp:
return "dateTime";

case Type.Date:
return "date";

case Type.Binary:
case Type.LargeBinary:
case Type.FixedSizeBinary:
return "binary";

case Type.Duration: //Not supported in Parquet yet
case Type.FixedSizeList:
case Type.Map:
case Type.Union:
case Type.Struct:
case Type.Interval:
case Type.List:
return "string";

default:
throw new Error(
`Unsupported Arrow DataType: ${Type[type.typeId] || "Unknown"} (${type.typeId})`
);
}
};
case Type.Decimal:
case Type.Float:
return "number";

case Type.Utf8:
case Type.LargeUtf8:
return "string";

case Type.Bool:
return "boolean";

case Type.Time:
case Type.Timestamp:
return "dateTime";

case Type.Date:
return "date";

case Type.Binary:
case Type.LargeBinary:
case Type.FixedSizeBinary:
return "binary";

case Type.Duration: //Not supported in Parquet yet
case Type.FixedSizeList:
case Type.Map:
case Type.Union:
case Type.Struct:
case Type.Interval:
case Type.List:
return "string";

default:
throw new Error(
`Unsupported Arrow DataType: ${Type[type.typeId] || "Unknown"} (${type.typeId})`
);
}
};

export const arrowTableToColumns = async (params: { table: Table<any> }) => {
const { table } = params;
function arrowTableToColumns(params: { table: Table<any> }) {
const { table } = params;

const columns = await Promise.all(
table.schema.fields.map(async field => {
const columnType = await getColumnType(field.type);
return table.schema.fields.map(field => {
const columnType = getColumnType(field.type);
return {
name: field.name,
type: columnType,
rowType: field.type.toString()
};
})
);

return columns;
};
});
}

export const arrowTableToRows = (params: { table: Table<any>; columns: Column[] }) => {
const { table, columns } = params;
const arrowTableToRows = (params: { table: Table<any>; columns: Column[] }) => {
const { table, columns } = params;

const rows: Record<string, any>[] = Array.from({ length: table.numRows }, () => ({}));
const rows: Record<string, any>[] = Array.from(
{ length: table.numRows },
() => ({})
);

for (const column of columns) {
const field = table.schema.fields.find(field => field.name === column.name);
assert(field !== undefined, `Field "${column.name}" not found in schema.`);
for (const column of columns) {
const field = table.schema.fields.find(field => field.name === column.name);
assert(field !== undefined, `Field "${column.name}" not found in schema.`);

const vector = table.getChild(column.name);
assert(vector !== null, `Column vector for "${column.name}" not found.`);
const vector = table.getChild(column.name);
assert(vector !== null, `Column vector for "${column.name}" not found.`);

const transformedColumn = convertVector({
vector,
expectedType: column.type
});
const transformedColumn = convertVector({
vector,
expectedType: column.type
});

for (let rowIndex = 0; rowIndex < table.numRows; rowIndex++) {
rows[rowIndex][column.name] = transformedColumn[rowIndex];
for (let rowIndex = 0; rowIndex < table.numRows; rowIndex++) {
rows[rowIndex][column.name] = transformedColumn[rowIndex];
}
}
}

return rows;
};

const convertVector = (params: { vector: Vector<any>; expectedType: Column["type"] }) => {
const { vector, expectedType } = params;

switch (expectedType) {
case "boolean":
return Array.from(vector.toArray()).map(Boolean);
case "string":
return Array.from(vector.toArray()).map(String);
case "date":
return Array.from(vector.toArray()).map(value => {
if (value === null) {
return null;
}
assert(typeof value === "number");
return new Date(value);
});
case "dateTime": {
return Array.from(vector.toArray()).map(value => {
if (value === null) {
return null;
}
assert(typeof value === "bigint");
const milliseconds = value / 1_000_000n; //Timestamps are in nanoseconds
return new Date(Number(milliseconds));
});
return rows;
};

const convertVector = (params: {
vector: Vector<any>;
expectedType: Column["type"];
}) => {
const { vector, expectedType } = params;

switch (expectedType) {
case "boolean":
return Array.from(vector.toArray()).map(Boolean);
case "string":
return Array.from(vector.toArray()).map(String);
case "date":
return Array.from(vector.toArray()).map(value => {
if (value === null) {
return null;
}
assert(typeof value === "number");
return new Date(value);
});
case "dateTime": {
return Array.from(vector.toArray()).map(value => {
if (value === null) {
return null;
}
assert(typeof value === "bigint");
const milliseconds = value / 1_000_000n; //Timestamps are in nanoseconds
return new Date(Number(milliseconds));
});
}

case "number":
return Array.from(vector.toArray()).map(Number);
case "bigint":
//return Array.from(vector.toArray()).map(value => BigInt(value as bigint)); #waiting for https://github.com/microsoft/TypeScript/issues/46395
return Array.from(vector.toArray()).map(String);
case "binary":
return Array.from(vector.toArray()).map(value => {
if (value instanceof Uint8Array) {
return Array.from(value)
.map(byte => byte.toString(16).padStart(2, "0"))
.join("");
}
return value;
});
}
};

case "number":
return Array.from(vector.toArray()).map(Number);
case "bigint":
//return Array.from(vector.toArray()).map(value => BigInt(value as bigint)); #waiting for https://github.com/microsoft/TypeScript/issues/46395
return Array.from(vector.toArray()).map(String);
case "binary":
return Array.from(vector.toArray()).map(value => {
if (value instanceof Uint8Array) {
return Array.from(value)
.map(byte => byte.toString(16).padStart(2, "0"))
.join("");
}
return value;
});
}
};
return { arrowTableToRows, arrowTableToColumns };
}
5 changes: 1 addition & 4 deletions web/src/core/ports/S3Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ export type S3Client = {
validityDurationSecond: number;
}) => Promise<string>;

getFileMetadata: (params: { path: string }) => Promise<{
contentType: string | undefined;
metadata: Record<string, string> | undefined;
}>;
getFileContentType: (params: { path: string }) => Promise<string | undefined>;

// getPresignedUploadUrl: (params: {
// path: string;
Expand Down
Loading

0 comments on commit ef001af

Please sign in to comment.