"
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "dataset._meta"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "33e35098-1790-4cf4-9543-d2bc5cc94908",
+ "metadata": {},
+ "source": [
+ "You'll notice this dataset has two top level fields:\n",
+ "- `type`\n",
+ "- `scoring`\n",
+ "\n",
+ "Inside of the `scoring` field (or Parquet column) are three subfields (`scoring` is a `Record` in awkward array terminology):\n",
+ "\n",
+ "- `player`\n",
+ "- `basket`\n",
+ "- `distance`\n",
+ "\n",
+ "We can also see that for each element in the top level array, we have exactly one entry for the `type` field, and some variable (showing array raggedness) number of `scoring` entries.\n",
+ "\n",
+ "The data we have here is some made up data about basketball games/matches. Each game is labeled as either a \"friendly\" match or a \"league\" match. Each game has some number of total scores, each score being made by some player as some type of basket at some distance. The raggedness of the array comes from each match having a different total number of scores.\n",
+ "\n",
+ "Since this first section of the tutorial is meant to show the basics of the IO functions, we won't worry too much about the details of the dataset, but we will revisit the structure in the next section!\n",
+ "\n",
+ "Since this tutorial is using a small toy dataset we can easily compute it quickly to see a concrete awkward array:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "id": "6a519e29-2f87-47cd-8490-efa8a19eada5",
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [],
+ "source": [
+ "computed_dataset = dataset.compute()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "id": "3b148260-ae3a-4c7c-b666-bd0263d14ccb",
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "[{type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " ...,\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]}]\n",
+ "--------------------------------------------------\n",
+ "type: 200 * {\n",
+ " type: string,\n",
+ " scoring: var * {\n",
+ " player: string,\n",
+ " basket: string,\n",
+ " distance: float64\n",
+ " }\n",
+ "}
"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 6,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "computed_dataset"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "581deaea-e157-4e08-9cb7-49de574a009c",
+ "metadata": {},
+ "source": [
+ "With parquet, we can restrict our data reading to only grab a specific set of columns from the files. In this toy dataset we're working with, if we only care about the specific players which did some scoring, we can specific that:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "id": "3cab1ce4-d972-4c25-97dd-fe58fa9f77e3",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "dataset = dak.from_parquet(pq_dir, columns=[\"scoring.player\"])"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "id": "25ca01b0-546c-4569-96b5-bed86a003648",
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "[...]\n",
+ "----------------------\n",
+ "type: ## * {\n",
+ " scoring: var * {\n",
+ " player: string\n",
+ " }\n",
+ "}
"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 8,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "dataset._meta"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "8ed602dd-b5cc-4535-937e-8d153272bd0d",
+ "metadata": {},
+ "source": [
+ "Notice that when we peek at the metadata now, we see our array is going to contain less information, as expected! If we tied to access one of the fields we didn't request, we'd hit an `AttributeError` (before compute time!). Since we are able to track metadata at graph construction time, we can fail as early as possible"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "id": "63742ac7-c18d-4801-9fda-6030a0c4e164",
+ "metadata": {},
+ "outputs": [
+ {
+ "ename": "AttributeError",
+ "evalue": "distance not in fields.",
+ "output_type": "error",
+ "traceback": [
+ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
+ "\u001b[0;31mAttributeError\u001b[0m Traceback (most recent call last)",
+ "File \u001b[0;32m~/software/repos/dask-awkward/src/dask_awkward/lib/core.py:1508\u001b[0m, in \u001b[0;36mArray.__getattr__\u001b[0;34m(self, attr)\u001b[0m\n\u001b[1;32m 1507\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 1508\u001b[0m cls_method \u001b[38;5;241m=\u001b[39m \u001b[43mgetattr_static\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_meta\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mattr\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1509\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mAttributeError\u001b[39;00m:\n",
+ "File \u001b[0;32m~/.pyenv/versions/3.11.7/lib/python3.11/inspect.py:1853\u001b[0m, in \u001b[0;36mgetattr_static\u001b[0;34m(obj, attr, default)\u001b[0m\n\u001b[1;32m 1852\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m default\n\u001b[0;32m-> 1853\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mAttributeError\u001b[39;00m(attr)\n",
+ "\u001b[0;31mAttributeError\u001b[0m: distance",
+ "\nDuring handling of the above exception, another exception occurred:\n",
+ "\u001b[0;31mAttributeError\u001b[0m Traceback (most recent call last)",
+ "Cell \u001b[0;32mIn[9], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mdataset\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mscoring\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdistance\u001b[49m\n",
+ "File \u001b[0;32m~/software/repos/dask-awkward/src/dask_awkward/lib/core.py:1510\u001b[0m, in \u001b[0;36mArray.__getattr__\u001b[0;34m(self, attr)\u001b[0m\n\u001b[1;32m 1508\u001b[0m cls_method \u001b[38;5;241m=\u001b[39m getattr_static(\u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_meta, attr)\n\u001b[1;32m 1509\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mAttributeError\u001b[39;00m:\n\u001b[0;32m-> 1510\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mAttributeError\u001b[39;00m(\u001b[38;5;124mf\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;132;01m{\u001b[39;00mattr\u001b[38;5;132;01m}\u001b[39;00m\u001b[38;5;124m not in fields.\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[1;32m 1511\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 1512\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mhasattr\u001b[39m(cls_method, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m_dask_get\u001b[39m\u001b[38;5;124m\"\u001b[39m):\n",
+ "\u001b[0;31mAttributeError\u001b[0m: distance not in fields."
+ ]
+ }
+ ],
+ "source": [
+ "dataset.scoring.distance"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "6eab1067-fa0f-46df-b055-27463795d271",
+ "metadata": {},
+ "source": [
+ "Let's go back to the original dataset and save it to JSON after repartitioning the collection:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "id": "63d11eb1-f822-4fc9-ae0b-c10fb6c8ea32",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "dataset = dak.from_parquet(pq_dir)\n",
+ "smaller_partition_dataset = dataset.repartition(15)\n",
+ "dak.to_json(smaller_partition_dataset, os.path.join(\"data\", \"json\"))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "17805aaa-26c4-4c2d-8664-bbe842d87c56",
+ "metadata": {},
+ "source": [
+ "`dask-awkward`'s `to_*` functions have a bit of special treatmeant compared to other dask-awkward functions. They are the only parts of dask-awkward that are _eagerly_ computed. The `to_*` functions have a `compute=` argument that defaults to `True`. If you'd like to stage a data writing step without compute, you can write:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "id": "ad88a084-6d83-4eb7-a4a8-7befe58543d5",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "write_it = dak.to_json(smaller_partition_dataset, os.path.join(\"data\", \"json2\"), compute=False)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "id": "532776bb-0789-45d0-9bd8-d108d5143f1a",
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "dask.awkward"
+ ]
+ },
+ "execution_count": 16,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "write_it"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "dd0b9879-c0d6-43a3-ad91-5f32209617ee",
+ "metadata": {},
+ "source": [
+ "Notice that the `write_it` object is a dask-awkward `Scalar` collection that can be computed."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 17,
+ "id": "fa5d00ee-2ec1-455e-b0e8-4c64f6e8d36a",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "write_it.compute()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "1ee8e243-9b83-4165-8073-9021e835ba09",
+ "metadata": {},
+ "source": [
+ "Now we can reload our data with `dak.from_json`. Realistically, taking data stored in parquet to then save it as JSON to be read later is likely a bad idea! But we're just doing this to show example usage of the dask-awkward API."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "id": "a59ea8ad-8ca6-444c-86cd-a4a4d9fc853d",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "dataset = dak.from_json(os.path.join(\"data\", \"json\"))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "id": "60ef65b6-793e-40df-b2fa-f8c74b2ee8d0",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "dask.awkward"
+ ]
+ },
+ "execution_count": 19,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "dataset"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "77eddfcb-ab19-44dd-b8e5-f72b73716aad",
+ "metadata": {},
+ "source": [
+ "## II. Column (buffer) optimization"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "dcb43949-40bd-4fdf-bd80-99c0a7b9376e",
+ "metadata": {},
+ "source": [
+ "Dask workflows can be separated into two stages: first is task graph construction, and second is task graph execution. During task graph construction we are able to track metadata about our awkward array collections; with that metadata knowledge we are able, just before execution time, to know which parts of the Array are necessary to complete a computation. This is possible by running the task graph on a metadata only version of the arrays. When we run the metadata task graph, components of the data-less array are \"touched\" by the execution of the graph, and when that happens we know that's a part of the data on disk that needs to be read. \n",
+ "\n",
+ "Let's look at a quick example with Parquet. Recall the dataset from the previou section. We have these columns:\n",
+ "\n",
+ "- `type`\n",
+ "- `scoring.player`\n",
+ "- `scoring.basket`\n",
+ "- `scoring.distance`\n",
+ "\n",
+ "If we want to calculate the average distance of each scored basket during each game, ignoreing all freethrows, we can calculate that like so:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "id": "62fcb593-63d5-4444-9d26-d0e23258f501",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "dataset = dak.from_parquet(pq_dir)\n",
+ "free_throws = dak.str.match_substring(dataset.scoring.basket, \"freethrow\")\n",
+ "distances = dataset.scoring.distance[free_throws == False]\n",
+ "result = dak.mean(distances, axis=1)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "389a0003-f538-4eac-a4d9-15006e6fdc7f",
+ "metadata": {},
+ "source": [
+ "The `result` will be the average distance of each non-free-throw shot. Notice we only used two of the four columns: `scoring.basket` and `scoring.distance`, If we wanted to be explicit about it, we could use the `columns=` argument in the `dak.from_parquet` call. But we can also just rely on dask-awkward to do this for us! The columns/buffer optimization will detect that the graph is only going to need those columns, rewriting the internal `ak.from_parquet` call at the node in the task graph that actually reads the data from disk. We can actually see this logic without running the compute with the `dak.necessary_columns` function:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "id": "840b3ebe-1454-4dca-bee0-50a31f9c0df8",
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'from-parquet-b7916bd949c3744cf0ec38dea00d0bd6': frozenset({'scoring.basket',\n",
+ " 'scoring.distance'})}"
+ ]
+ },
+ "execution_count": 21,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "dak.necessary_columns(result)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "c6e4e54c-33c1-42a4-9b0f-81bf6348b6d1",
+ "metadata": {},
+ "source": [
+ "We see the name of the input layer, and the names of the columns that are going to be read by that input layer.\n",
+ "\n",
+ "This will also work with JSON. Awkward-Array's `from_json` has a feature that allows users to pass in a JSONSchema that instructs the reader which parts of the JSON dataset should be read. The reader still has to process all of the bytes in the text based file format but with a schema declared, the reader can intelligently skip over different keys in the JSON, saving memory and and time during array building.\n",
+ "\n",
+ "Here's the same computation but starting with a JSON dataset:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
+ "id": "13914bf9-1f45-4860-8dc7-ec8eeb746bc0",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "dataset = dak.from_json(os.path.join(\"data\", \"json\"))\n",
+ "free_throws = dak.str.match_substring(dataset.scoring.basket, \"freethrow\")\n",
+ "distances = dataset.scoring.distance[free_throws == False]\n",
+ "result = dak.mean(distances, axis=1)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 23,
+ "id": "1bc6e94b-ee5e-42d1-b789-6f80859b1d64",
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'from-json-files-6eebaf87f3a09a08c1234137dd381b61': frozenset({'scoring.basket',\n",
+ " 'scoring.distance'})}"
+ ]
+ },
+ "execution_count": 23,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "dak.necessary_columns(result)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "3f9380f6-c4d4-4a2b-bb99-5e15e1da9039",
+ "metadata": {},
+ "source": [
+ "We see the exact same necessary columns.\n",
+ "\n",
+ "A final little detail. The way that we generate the JSON schema which is then passed to the reading node is with `dak.layout_to_jsonschema`. Once the column/buffer optimization has determined which are the fields will be necessary, we can select those fields from the awkward array form that we start with after the `dak.from_json` call. We then generate an awkward array layout from the sub-form generated by selecting a subset of the columns. Finally, we create a JSONSchema from that layout:\n",
+ "\n",
+ "In our small example case here, we know the columns are `scoring.basket` and `scoring.distance`. We can show this step manually here (starting with the first array collection created with the `dak.from_json call):"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "id": "e444fa35-03ee-4292-8730-490dacd145fb",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# create the subform based on the columns we need:\n",
+ "subform = dataset.form.select_columns([\"scoring.basket\", \"scoring.distance\"])\n",
+ "# create an awkward array layout:\n",
+ "sublayout = subform.length_zero_array(highlevel=False)\n",
+ "# and convert that to JSONSchema:\n",
+ "necessary_schema = dak.layout_to_jsonschema(sublayout)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 25,
+ "id": "146a84b4-26ce-45c5-ad16-9c8967b60214",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'title': 'untitled',\n",
+ " 'description': 'Auto generated by dask-awkward',\n",
+ " 'type': 'object',\n",
+ " 'properties': {'scoring': {'type': 'array',\n",
+ " 'items': {'type': 'object',\n",
+ " 'properties': {'basket': {'type': 'string'},\n",
+ " 'distance': {'type': 'number'}}}}}}"
+ ]
+ },
+ "execution_count": 25,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "necessary_schema"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "a0952470-5557-4dca-919b-84970788cfdd",
+ "metadata": {},
+ "source": [
+ "This feature can be turned off when running dask-awkward graphs with the config parameter \"awkward.optimization.enabled\". By default this setting is `True`. We can run the same compute with the feature turned off via:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 26,
+ "id": "f2d75df4-c8a7-4abd-942c-f1e94c124ec7",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import dask\n",
+ "\n",
+ "with dask.config.set({\"awkward.optimization.enabled\": False}):\n",
+ " result.compute()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "97b953ff-8edf-46a6-b069-c2aa6f802485",
+ "metadata": {},
+ "source": [
+ "This could be useful for debugging. If the compute fails with the optimization enabled, but succeeds with the optimization disabled, then there is likely a bug in dask-awkward or awkward-array that should be raised!"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.11.7"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/docs/examples/io-tutorial/io-01-advanced.ipynb b/docs/examples/io-tutorial/io-01-advanced.ipynb
new file mode 100644
index 00000000..509f37ee
--- /dev/null
+++ b/docs/examples/io-tutorial/io-01-advanced.ipynb
@@ -0,0 +1,355 @@
+{
+ "cells": [
+ {
+ "cell_type": "markdown",
+ "id": "8096427e-e5c5-4b5c-9d74-b172d786caa1",
+ "metadata": {},
+ "source": [
+ "# Advanced Features"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0ffa55e3-592a-419f-a938-78b11889adc3",
+ "metadata": {},
+ "source": [
+ "*Before reading this notebook we recommend reading [the basic notebook first!](io-00-basic.ipynb)*\n",
+ "\n",
+ "_last updated 2024-01-26_\n",
+ "\n",
+ "All of the high level file format readers in `dask-awkward` are based on a lower level API: the `from_map` function. This function provides an interface that allows any user defined function to be used as a source of awkward arrays at the nodes in a Dask graph.\n",
+ "\n",
+ "A very simple usage of the `from_map` API would be to re-create `from_parquet`:\n",
+ "\n",
+ "```python\n",
+ "dak.from_map(\n",
+ " ak.from_parquet,\n",
+ " [\"/path/to/some/file1.parquet\", \"/path/to/some/file2.parquet\"],\n",
+ " label=\"my-from-parquet\",\n",
+ ")\n",
+ "```\n",
+ "\n",
+ "This will create a `dask-awkward` collection that calls `ak.from_parquet` on those two files, which as stated above, is a simple recreation of `dak.from_parquet` (obviously less flexible/powerful than `from_parquet`! but one should get the idea)\n",
+ "\n",
+ "The power of `from_map` materializes when one would like to take advantage of column optimization or gracefully fail, returning an empty array instead of a program crashing, at some nodes where read issues surface. We can begin to demonstrate these features by defining a function class to be passed in as the first argument to `from_map`.\n",
+ "\n",
+ "Our example will be a special Parquet reader that rejects any file that contains a \"0\" in the filename. For some reason we've found that data to be corrupt, but we want to be able still process the whole directory and not manually skip those files\n",
+ "\n",
+ "We'll write out the class implementation and then explain each of the methods:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "id": "25ade1b2-8988-4147-a53b-f6837aef4f9c",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from __future__ import annotations\n",
+ "\n",
+ "from typing import Any\n",
+ "\n",
+ "import awkward as ak\n",
+ "import dask\n",
+ "import dask_awkward as dak\n",
+ "from dask_awkward.lib.io.columnar import ColumnProjectionMixin\n",
+ "\n",
+ "class Ignore0ParquetReader(ColumnProjectionMixin):\n",
+ " def __init__(\n",
+ " self,\n",
+ " form: Form,\n",
+ " report: bool = False,\n",
+ " allowed_exceptions: tuple[type[BaseException], ...] = (OSError,),\n",
+ " columns: list[str] | None = None,\n",
+ " behavior: dict | None = None,\n",
+ " **kwargs: Any\n",
+ " ):\n",
+ " self.form = form\n",
+ " self.report = report\n",
+ " self.allowed_exceptions = allowed_exceptions\n",
+ " self.columns = columns\n",
+ " self.behavior = behavior\n",
+ " self.kwargs = kwargs\n",
+ "\n",
+ " @property\n",
+ " def return_report(self) -> bool:\n",
+ " return self.report\n",
+ "\n",
+ " @property\n",
+ " def use_optimization(self) -> bool:\n",
+ " return True\n",
+ "\n",
+ " @staticmethod\n",
+ " def report_success(source, columns) -> ak.Array:\n",
+ " return ak.Array([{\"source\": source, \"exception\": None, \"columns\": columns}])\n",
+ "\n",
+ " @staticmethod\n",
+ " def report_failure(source, exception) -> ak.Array:\n",
+ " return ak.Array([{\"source\": source, \"exception\": repr(exception), \"columns\": None}])\n",
+ "\n",
+ " def mock(self) -> ak.Array:\n",
+ " return ak.typetracer.typetracer_from_form(self.form, highlevel=True)\n",
+ "\n",
+ " def mock_empty(self, backend=\"cpu\") -> ak.Array:\n",
+ " return ak.to_backend(self.form.length_one_array(highlevel=False), backend=backend, highlevel=True)\n",
+ "\n",
+ " def read_from_disk(self, source: Any) -> ak.Array:\n",
+ " if \"0\" in source:\n",
+ " raise OSError(\"cannot read files that contain '0' in the name\")\n",
+ " return ak.from_parquet(source, columns=self.columns, **self.kwargs)\n",
+ "\n",
+ " def __call__(self, *args, **kwargs):\n",
+ " source = args[0]\n",
+ " if self.return_report:\n",
+ " try:\n",
+ " array = self.read_from_disk(source)\n",
+ " return array, self.report_success(source, self.columns)\n",
+ " except self.allowed_exceptions as err:\n",
+ " array = self.mock_empty()\n",
+ " return array, self.report_failure(source, err)\n",
+ " else:\n",
+ " return self.read_from_disk(source) \n",
+ "\n",
+ " def project_columns(self, columns):\n",
+ " return Ignore0ParquetReader(\n",
+ " form=self.form.select_columns(columns),\n",
+ " report=self.return_report,\n",
+ " allowed_exceptions=self.allowed_exceptions,\n",
+ " columns=columns,\n",
+ " **self.kwargs,\n",
+ " )\n",
+ "\n",
+ "\n",
+ "def my_read_parquet(path, columns=None, allowed_exceptions=(OSError,)):\n",
+ " pq_files = [os.path.join(path, f) for f in os.listdir(path) if f.endswith(\"parquet\")]\n",
+ " meta_from_pq = ak.metadata_from_parquet(pq_files)\n",
+ " form = meta_from_pq[\"form\"]\n",
+ " fn = Ignore0ParquetReader(form, report=True, allowed_exceptions=allowed_exceptions)\n",
+ " return dak.from_map(fn, pq_files)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "0d176ec9-ab45-425c-a680-2f1adba0b32c",
+ "metadata": {},
+ "source": [
+ "Here's why we have each of the methods!\n",
+ "\n",
+ "- Starting with inheriting the `ColumnProjectionMixin`, inheriting from this mixin makes the class compatible with column optimization.\n",
+ "- `__init__`: of course this is needed. It's going to take the starting form that the array should have, a tuple of exceptions that will be allowed to be raised at compute time that we can gracefully absorb, the columns to read, the awkward-array behavior that should be used, and additional kwargs that should be passed at each node's call of `ak.from_parquet\n",
+ "- `return_report`: a class property that will tell `from_map` whether or not we will also return a report array\n",
+ "- `use_optimization`: a class property that tells the columns optimization that we want this function class to be columns optimizable.\n",
+ "- `report_success`: a static method that will be used to construct an report array when the read is successful at a partition\n",
+ "- `report_failure`: the parter to `report_success`, if one of the allowed exceptions is raised at a partition at array creation time, this method will be called to construct an report array\n",
+ "- `mock`: a method that \"mocks\" the array that would be created, returns a dataless typetracer array\n",
+ "- `mock_empty`: a method that mocks the array but is not a typetracer array, it's an empty concrete awkward array. This is the method that is used at nodes that fail with an allowed exception.\n",
+ "- `read_from_disk`: this is the method that will be called to... read data from disk! What actually matters more is the next method:\n",
+ "- `__call__`: we finally get to the \"function\" part of this class: This method will be called at each partition. You'll notice that we call `read_from_disk` here, but we wrap it in a `try`, `except` block if we want to return the read-report that allows for graceful fails\n",
+ "- `project_columns`: this method is necessary for rewriting the class instructing it to read a new set of columns. This method is part of the optimization interface\n",
+ "\n",
+ "Finally, we write a function that is going to use this function class and call `from_map\n",
+ "\n",
+ "Let's use it to read our parquet dataset and look at both the resulting array and the post-compute report. Notice that the report itself is a lazily evaluated dask-awkward Array collection that should be computed simultaneously with the collection-of-interest."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "a00b299a-dcc3-4106-a314-86475345f363",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "pq_dir = os.path.join(\"data\", \"parquet\")\n",
+ "dataset, report = my_read_parquet(pq_dir)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "id": "64fbcdc5-f1e2-44bf-81a0-ecaf375662be",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "result, computed_report = dask.compute(dataset, report)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "4d32383a-a09d-44bc-9a2c-4b1f28b6eb98",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "[{type: '', scoring: []},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " ...,\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'league', scoring: [{...}, ..., {...}]},\n",
+ " {type: 'friendly', scoring: [{...}, ..., {...}]}]\n",
+ "--------------------------------------------------\n",
+ "type: 151 * {\n",
+ " type: string,\n",
+ " scoring: var * {\n",
+ " player: string,\n",
+ " basket: string,\n",
+ " distance: float64\n",
+ " }\n",
+ "}
"
+ ],
+ "text/plain": [
+ ""
+ ]
+ },
+ "execution_count": 4,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "result"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "id": "a1202361-6b24-4400-bf7e-632d744e13b7",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[{'source': 'data/parquet/part0.parquet',\n",
+ " 'exception': 'OSError(\"cannot read files that contain \\'0\\' in the name\")',\n",
+ " 'columns': None},\n",
+ " {'source': 'data/parquet/part2.parquet',\n",
+ " 'exception': None,\n",
+ " 'columns': ['type', 'scoring.distance', 'scoring.basket', 'scoring.player']},\n",
+ " {'source': 'data/parquet/part3.parquet',\n",
+ " 'exception': None,\n",
+ " 'columns': ['type', 'scoring.distance', 'scoring.basket', 'scoring.player']},\n",
+ " {'source': 'data/parquet/part1.parquet',\n",
+ " 'exception': None,\n",
+ " 'columns': ['type', 'scoring.distance', 'scoring.basket', 'scoring.player']}]"
+ ]
+ },
+ "execution_count": 5,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "computed_report.tolist()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "id": "3e1c1aa5-72cd-4baa-abbe-a3791d3f3d4a",
+ "metadata": {},
+ "source": [
+ "We can see in the report that the file with a \"0\" in the name indeed failed!\n",
+ "\n",
+ "You'll see that we added the columns that are read to the report as well, so if we perform a compute that will only need a subset of the columns, we can get confirmation from our report array. We get the column optimization by inheriting from the column optimization mixin!"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "id": "e68910a0-b62e-483e-9098-25424850a05c",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "{'<__main__.Ignore0ParquetReader object at 0x7fbc1c5-98de39e045724a64b44ebd0cc521dc4e': frozenset({'scoring.player'})}"
+ ]
+ },
+ "execution_count": 6,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "dak.necessary_columns(dataset.scoring.player)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "id": "eb43bb7a-047b-4cda-b124-40c671448b31",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "result, computed_report= dask.compute(dataset.scoring.player, report)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "id": "17851b18-4ae6-4bd8-a562-3cf816a39a5c",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "[{'source': 'data/parquet/part0.parquet',\n",
+ " 'exception': 'OSError(\"cannot read files that contain \\'0\\' in the name\")',\n",
+ " 'columns': None},\n",
+ " {'source': 'data/parquet/part2.parquet',\n",
+ " 'exception': None,\n",
+ " 'columns': ['scoring.player']},\n",
+ " {'source': 'data/parquet/part3.parquet',\n",
+ " 'exception': None,\n",
+ " 'columns': ['scoring.player']},\n",
+ " {'source': 'data/parquet/part1.parquet',\n",
+ " 'exception': None,\n",
+ " 'columns': ['scoring.player']}]"
+ ]
+ },
+ "execution_count": 8,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "computed_report.tolist()"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.11.7"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}
diff --git a/docs/how-to/io.rst b/docs/how-to/io.rst
index 2db7cad6..07b63f26 100644
--- a/docs/how-to/io.rst
+++ b/docs/how-to/io.rst
@@ -21,16 +21,23 @@ will be partitioned on a per-file basis
Support for the ROOT file format is provided by the Uproot_ project.
+The dask-awkward repository contains a Jupyter notebook tutorial going
+into more details about IO. You can find that notebook at
+`docs/examples/io-tutorial
+`_.
+
It's also possible to instantiate dask-awkward
:class:`dask_awkward.Array` instances from other Dask collections
(like :class:`dask.array.Array`), or concrete objects like existing
awkward Array instances or Python lists.
-.. _Uproot: https://github.com/scikit-hep/uproot5
See the :ref:`IO API docs` page for more information on the
possible ways to instantiate a new dask-awkward Array.
+
+.. _Uproot: https://github.com/scikit-hep/uproot5
+
.. raw:: html