Skip to content

Commit

Permalink
GH-14932: [Python] Add python bindings for JSON streaming reader (#45084
Browse files Browse the repository at this point in the history
)

### Rationale for this change

The C++ arrow has a JSON streaming reader which is not exposed on the Python interface.

### What changes are included in this PR?

This PR is based on #33761. It adds the `open_json` method to open a streaming reader for a JSON file.

### Are these changes tested?

Yes

### Are there any user-facing changes?

Yes. A new `open_json` method has been added to the Python interface, located at `pyarrow.json.open_json`, and its parameters are the same as the `pyarrow.json.read_json`

* GitHub Issue: #14932

Lead-authored-by: pxc <[email protected]>
Co-authored-by: Akshay Subramanian <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
  • Loading branch information
pan-x-c and asubram12 authored Feb 6, 2025
1 parent deccce1 commit 16c7f1a
Show file tree
Hide file tree
Showing 8 changed files with 396 additions and 38 deletions.
1 change: 1 addition & 0 deletions docs/source/python/api/formats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ JSON Files

ReadOptions
ParseOptions
open_json
read_json

.. _api.parquet:
Expand Down
12 changes: 12 additions & 0 deletions docs/source/python/json.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,15 @@ and pass it to :func:`read_json`. For example, you can pass an explicit

Similarly, you can choose performance settings by passing a
:class:`ReadOptions` instance to :func:`read_json`.


Incremental reading
-------------------

For memory-constrained environments, it is also possible to read a JSON file
one batch at a time, using :func:`open_json`.

In this case, type inference is done on the first block and types are frozen afterwards.
To make sure the right data types are inferred, either set
:attr:`ReadOptions.block_size` to a large enough value, or use
:attr:`ParseOptions.explicit_schema` to set the desired data types explicitly.
2 changes: 1 addition & 1 deletion python/pyarrow/_csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1295,7 +1295,7 @@ def open_csv(input_file, read_options=None, parse_options=None,
Options for converting CSV data
(see pyarrow.csv.ConvertOptions constructor for defaults)
memory_pool : MemoryPool, optional
Pool to allocate Table memory from
Pool to allocate RecordBatch memory from
Returns
-------
Expand Down
78 changes: 77 additions & 1 deletion python/pyarrow/_json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (_Weakrefable, MemoryPool,

from pyarrow.lib cimport (_Weakrefable, Schema,
RecordBatchReader, MemoryPool,
maybe_unbox_memory_pool,
get_input_stream, pyarrow_wrap_table,
pyarrow_wrap_schema, pyarrow_unwrap_schema)
Expand Down Expand Up @@ -266,6 +268,38 @@ cdef _get_parse_options(ParseOptions parse_options, CJSONParseOptions* out):
out[0] = parse_options.options


cdef class JSONStreamingReader(RecordBatchReader):
"""An object that reads record batches incrementally from a JSON file.
Should not be instantiated directly by user code.
"""
cdef readonly:
Schema schema

def __init__(self):
raise TypeError(f"Do not call {self.__class__.__name__}'s "
"constructor directly, "
"use pyarrow.json.open_json() instead.")

cdef _open(self, shared_ptr[CInputStream] stream,
CJSONReadOptions c_read_options,
CJSONParseOptions c_parse_options,
MemoryPool memory_pool):
cdef:
shared_ptr[CSchema] c_schema
CIOContext io_context

io_context = CIOContext(maybe_unbox_memory_pool(memory_pool))

with nogil:
self.reader = <shared_ptr[CRecordBatchReader]> GetResultValue(
CJSONStreamingReader.Make(stream, move(c_read_options),
move(c_parse_options), io_context))
c_schema = self.reader.get().schema()

self.schema = pyarrow_wrap_schema(c_schema)


def read_json(input_file, read_options=None, parse_options=None,
MemoryPool memory_pool=None):
"""
Expand Down Expand Up @@ -308,3 +342,45 @@ def read_json(input_file, read_options=None, parse_options=None,
table = GetResultValue(reader.get().Read())

return pyarrow_wrap_table(table)


def open_json(input_file, read_options=None, parse_options=None,
MemoryPool memory_pool=None):
"""
Open a streaming reader of JSON data.
Reading using this function is always single-threaded.
Parameters
----------
input_file : string, path or file-like object
The location of JSON data. If a string or path, and if it ends
with a recognized compressed file extension (e.g. ".gz" or ".bz2"),
the data is automatically decompressed when reading.
read_options : pyarrow.json.ReadOptions, optional
Options for the JSON reader (see pyarrow.json.ReadOptions constructor
for defaults)
parse_options : pyarrow.json.ParseOptions, optional
Options for the JSON parser
(see pyarrow.json.ParseOptions constructor for defaults)
memory_pool : MemoryPool, optional
Pool to allocate RecordBatch memory from
Returns
-------
:class:`pyarrow.json.JSONStreamingReader`
"""
cdef:
shared_ptr[CInputStream] stream
CJSONReadOptions c_read_options
CJSONParseOptions c_parse_options
JSONStreamingReader reader

_get_reader(input_file, &stream)
_get_read_options(read_options, &c_read_options)
_get_parse_options(parse_options, &c_parse_options)

reader = JSONStreamingReader.__new__(JSONStreamingReader)
reader._open(stream, move(c_read_options), move(c_parse_options),
memory_pool)
return reader
7 changes: 7 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2176,6 +2176,13 @@ cdef extern from "arrow/json/reader.h" namespace "arrow::json" nogil:

CResult[shared_ptr[CTable]] Read()

cdef cppclass CJSONStreamingReader" arrow::json::StreamingReader"(
CRecordBatchReader):
@staticmethod
CResult[shared_ptr[CJSONStreamingReader]] Make(
shared_ptr[CInputStream],
CJSONReadOptions, CJSONParseOptions, CIOContext)


cdef extern from "arrow/util/thread_pool.h" namespace "arrow::internal" nogil:

Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
# under the License.


from pyarrow._json import ReadOptions, ParseOptions, read_json # noqa
from pyarrow._json import ReadOptions, ParseOptions, read_json, open_json # noqa
2 changes: 1 addition & 1 deletion python/pyarrow/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ def read_bytes(self, b, **kwargs):
"""
:param b: bytes to be parsed
:param kwargs: arguments passed on to open the csv file
:return: b parsed as a single RecordBatch
:return: b parsed as a single Table
"""
raise NotImplementedError

Expand Down
Loading

0 comments on commit 16c7f1a

Please sign in to comment.