Skip to content

Commit

Permalink
work on Arrow Table translation
Browse files Browse the repository at this point in the history
  • Loading branch information
ddecrulle committed Jan 20, 2025
1 parent 6b7fb46 commit 895fe8d
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 93 deletions.
1 change: 1 addition & 0 deletions web/src/core/adapters/sqlOlap/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from "./sqlOlap";
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { assert } from "tsafe/assert";
import memoize from "memoizee";
import { same } from "evt/tools/inDepth/same";
import type { ReturnType } from "tsafe";
import { arrowTableToRowsAndColumns } from "./utils/arrowTableToRowsAndColumns";

export const createDuckDbSqlOlap = (params: {
getS3Config: () => Promise<
Expand Down Expand Up @@ -105,36 +106,7 @@ export const createDuckDbSqlOlap = (params: {
return db;
};
})(),
getColumns: async ({ sourceUrl, fileType }) => {
const db = await sqlOlap.getConfiguredAsyncDuckDb();

const conn = await db.connect();

const sqlQuery = `DESCRIBE SELECT * FROM ${(() => {
switch (fileType) {
case "csv":
return `read_csv('${sourceUrl}')`;
case "parquet":
return `read_parquet('${sourceUrl}')`;
case "json":
return `read_json('${sourceUrl}')`;
}
})()}`;

const stmt = await conn.prepare(sqlQuery);

const res = await stmt.query();

const columns = res.toArray().map(row => {
return {
name: row.column_name,
type: row.column_type
};
});

return columns;
},
getRows: async ({ sourceUrl, fileType, rowsPerPage, page }) => {
getRowsAndColumns: async ({ sourceUrl, fileType, rowsPerPage, page }) => {
const db = await sqlOlap.getConfiguredAsyncDuckDb();

const conn = await db.connect();
Expand All @@ -154,25 +126,13 @@ export const createDuckDbSqlOlap = (params: {

const res = await stmt.query();

const rows = JSON.parse(
JSON.stringify(res.toArray(), (_key, value) => {
if (typeof value === "bigint") {
return value.toString();
}

if (value instanceof Uint8Array) {
return Array.from(value)
.map(byte => byte.toString(16).padStart(2, "0"))
.join("");
}

return value;
})
);
const { rows, columns } = await arrowTableToRowsAndColumns({
table: res
});

await conn.close();

return rows;
return { rows, columns };
},
getRowCount: memoize(
async ({ sourceUrl, fileType }) => {
Expand Down
121 changes: 121 additions & 0 deletions web/src/core/adapters/sqlOlap/utils/arrowTableToRowsAndColumns.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import type { Table, DataType, Vector } from "apache-arrow";
import { Column } from "core/ports/SqlOlap";
import { assert } from "tsafe/assert";

// Helper function to map Arrow DataType to a user-friendly Column.type
const getColumnType = async (type: DataType): Promise<Column["type"]> => {
const { Type, Int } = await import("apache-arrow");
switch (type.typeId) {
case Type.Int: {
assert(type instanceof Int);
if (type.bitWidth === 64) {
return "bigint";
}
return "number";
}

case Type.Float: {
return "number";
}
case Type.Utf8:
case Type.LargeUtf8:
return "string";

case Type.Bool:
return "boolean";

case Type.Time:
case Type.Timestamp:
return "dateTime";

case Type.Date:
return "date";

case Type.Binary:
case Type.LargeBinary:
case Type.FixedSizeBinary:
return "binary";

case Type.Struct:
case Type.List:
return "string";
default:
throw new Error(
`Unsupported Arrow DataType: ${Type[type.typeId] || "Unknown"} (${type.typeId})`
);
}
};

export const arrowTableToRowsAndColumns = async (params: { table: Table<any> }) => {
const { table } = params;

const rows: Record<string, any>[] = Array.from({ length: table.numRows }, () => ({}));
const columns: Column[] = [];

for (const field of table.schema.fields) {
const column = table.getChild(field.name);
assert(column !== null, `Column for field "${field.name}" not found.`);

const columnType = await getColumnType(field.type);

columns.push({
name: field.name,
type: columnType
});

const transformedColumn = convertVector({
vector: column,
expectedType: columnType
});

for (let rowIndex = 0; rowIndex < table.numRows; rowIndex++) {
rows[rowIndex][field.name] = transformedColumn[rowIndex];
}
}

return { rows, columns };
};

const convertVector = (params: { vector: Vector<any>; expectedType: Column["type"] }) => {
const { vector, expectedType } = params;

switch (expectedType) {
case "boolean":
return Array.from(vector.toArray()).map(Boolean);
case "string":
return Array.from(vector.toArray()).map(String);
case "date":
return Array.from(vector.toArray()).map(value => {
if (value === null) {
return null;
}
assert(typeof value === "number");
return new Date(value);
});
case "dateTime": {
return Array.from(vector.toArray()).map(value => {
if (value === null) {
return null;
}
assert(typeof value === "bigint");
const milliseconds = value / 1_000_000n; //Timestamps are in nanoseconds
return new Date(Number(milliseconds));
});
}

case "number":
return Array.from(vector.toArray()).map(Number);
case "bigint":
return Array.from(vector.toArray()).map(String);
return Array.from(vector.toArray()).map(value => BigInt(value as bigint));
case "binary":
return Array.from(vector.toArray()).map(value => {
if (value instanceof Uint8Array) {
return Array.from(value)
.map(byte => byte.toString(16).padStart(2, "0"))
.join("");
}
return value;
});
}
};
13 changes: 7 additions & 6 deletions web/src/core/ports/SqlOlap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ export type SqlOlap = {
sourceUrl: string;
fileType: "parquet" | "csv" | "json";
}) => Promise<number | undefined>;
getRows: (params: {
getRowsAndColumns: (params: {
sourceUrl: string;
fileType: "parquet" | "csv" | "json";
rowsPerPage: number;
page: number;
}) => Promise<any[]>;
getColumns: (params: {
sourceUrl: string;
fileType: "parquet" | "csv" | "json";
}) => Promise<{ name: string; type: any }[]>;
}) => Promise<{ rows: unknown[]; columns: Column[] }>;
};

export type Column = {
name: string;
type: "string" | "number" | "bigint" | "boolean" | "date" | "dateTime" | "binary";
};
Empty file.
36 changes: 27 additions & 9 deletions web/src/core/usecases/dataExplorer/selectors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,31 @@ import type { GridColDef } from "@mui/x-data-grid";

const state = (rootState: RootState) => rootState[name];

const main = createSelector(state, state => {
const columns = createSelector(
createSelector(state, state => state.data),
data => {
if (data === undefined || data.state !== "loaded") {
return undefined;
}

const columns = data.columns.map(
column =>
({
field: column.name,
sortable: false,
type: (() => {
if (column.type === "bigint") return "string";
if (column.type === "binary") return "string";
return column.type;
})()
}) satisfies GridColDef
);

return columns;
}
);

const main = createSelector(state, columns, (state, columns) => {
const { isQuerying, queryParams, errorMessage, data, extraRestorableStates } = state;

if (errorMessage !== undefined) {
Expand All @@ -24,6 +48,7 @@ const main = createSelector(state, state => {
assert(queryParams.rowsPerPage !== undefined);
assert(queryParams.page !== undefined);
assert(extraRestorableStates !== undefined);
assert(columns !== undefined);

const { rowsPerPage, page } = queryParams;
return {
Expand All @@ -36,14 +61,7 @@ const main = createSelector(state, state => {
queryParams,
extraRestorableStates,
fileDownloadUrl: data.fileDownloadUrl,
columns: data.columns.map(
column =>
({
field: column.name,
sortable: false,
type: "string"
}) satisfies GridColDef
)
columns
};
}
}
Expand Down
5 changes: 3 additions & 2 deletions web/src/core/usecases/dataExplorer/state.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { createUsecaseActions } from "clean-architecture";
import type { Column } from "core/ports/SqlOlap";
import { assert } from "tsafe/assert";
import { id } from "tsafe/id";

Expand All @@ -24,7 +25,7 @@ export type State = {
| {
state: "loaded";
rows: any[];
columns: { name: string; type: string }[];
columns: Column[];
rowCount: number | undefined;
fileDownloadUrl: string;
fileType: "parquet" | "csv" | "json";
Expand Down Expand Up @@ -105,7 +106,7 @@ export const { actions, reducer } = createUsecaseActions({
}: {
payload: {
rows: any[];
columns: { name: string; type: string }[];
columns: Column[];
rowCount: number | undefined;
fileDownloadUrl: string;
fileType: "parquet" | "csv" | "json";
Expand Down
17 changes: 6 additions & 11 deletions web/src/core/usecases/dataExplorer/thunks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,6 @@ const privateThunks = {
})
));

const columns = await (async () => {
if (!isSourceUrlChanged) {
assert(data.state === "loaded");
return data.columns;
}
return sqlOlap.getColumns({ sourceUrl, fileType });
})();

const rowCountOrErrorMessage = await (async () => {
if (!isSourceUrlChanged) {
assert(data.state === "loaded");
Expand Down Expand Up @@ -143,20 +135,23 @@ const privateThunks = {
}

const rowsOrErrorMessage = await sqlOlap
.getRows({
.getRowsAndColumns({
sourceUrl,
rowsPerPage: rowsPerPage + 1,
page,
fileType
})
.catch(error => String(error));
.catch(error => {
console.error(error);
return String(error);
});

if (typeof rowsOrErrorMessage === "string") {
dispatch(actions.queryFailed({ errorMessage: rowsOrErrorMessage }));
return;
}

const rows = rowsOrErrorMessage;
const { columns, rows } = rowsOrErrorMessage;
const hasMore = rows.length === rowsPerPage + 1;

dispatch(
Expand Down
44 changes: 25 additions & 19 deletions web/src/ui/shared/Datagrid/CustomDataGrid.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,31 @@ export const CustomDataGrid = <R extends GridValidRowModel = any>(
const modifiedColumns = useMemo(
() =>
shouldAddCopyToClipboardInCell
? columns.map(
column =>
({
...column,
renderCell: ({ value, hasFocus }) => (
<>
<div style={{ width: "100%" }}>{value}</div>
<CopyToClipboardIconButton
textToCopy={value}
className={css({
visibility: hasFocus ? "visible" : "hidden", //This ensure to preserve space for the icon when cell are auto resized
right: 0
})}
/>
</>
),
display: "flex"
}) satisfies GridColDef
)
? columns.map(column => {
const originalRenderCell = column.renderCell;
return {
...column,
renderCell: params => (
<>
{originalRenderCell ? (
originalRenderCell(params)
) : (
<div>{params.value.toString()}</div>
)}
<CopyToClipboardIconButton
textToCopy={params.value}
className={css({
visibility: params.hasFocus
? "visible"
: "hidden", // Ensure space is preserved for the icon
right: 0
})}
/>
</>
),
display: "flex"
} satisfies GridColDef;
})
: columns,
[columns, shouldAddCopyToClipboardInCell]
);
Expand Down

0 comments on commit 895fe8d

Please sign in to comment.