Skip to content

Commit

Permalink
[SPARK-46641][SS] Add maxBytesPerTrigger threshold
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR adds [Input Streaming Source's](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#input-sources) option `maxBytesPerTrigger` for limiting the total size of input files in a streaming batch. Semantics of `maxBytesPerTrigger` is very close to already existing one `maxFilesPerTrigger` option.

#### How a feature was implemented?
Because `maxBytesPerTrigger` is semantically close to `maxFilesPerTrigger` I used all the `maxFilesPerTrigger` usages in the whole repository as a potential places that requires changes, that includes:
- Option paramater definition
- Option related logic
- Option related ScalaDoc and MD files
- Option related test

I went over the usage of all usages of `maxFilesPerTrigger` in `FileStreamSourceSuite` and implemented `maxBytesPerTrigger` in the same fashion as those two are pretty close in their nature. From the structure and elements of ReadLimit I've concluded that current design implies only one simple rule for ReadLimit, so I openly prohibited the setting of both maxFilesPerTrigger and maxBytesPerTrigger at the same time.

### Why are the changes needed?
This feature is useful for our and our sister teams and we expect it will find a broad acceptance among Spark users. We have a use-case in a few of the Spark pipelines we support when we use Available-now trigger for periodic processing using Spark Streaming. We use `maxFilesPerTrigger` threshold for now, but this is not ideal as Input file size might change with the time which requires periodic configuration adjustment of `maxFilesPerTrigger`. Computational complexity of the job depends on the event count/total size of the  input and `maxBytesPerTrigger` is a better predictor of that than `maxFilesPerTrigger`.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
New unit tests were added or existing `maxFilesPerTrigger` test were extended. I searched `maxFilesPerTrigger` related test  and added new tests or extended existing ones trying to minimize and simplify the changes.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#44636 from MaxNevermind/streaming-add-maxBytesPerTrigger-option.

Lead-authored-by: maxim_konstantinov <[email protected]>
Co-authored-by: Max Konstantinov <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
2 people authored and dongjoon-hyun committed Feb 9, 2024
1 parent 8603ed5 commit 3f5faaa
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L
* schema in advance, use the version that specifies the schema to avoid the extra scan.
*
* You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit):
* sets the maximum number of new files to be considered in every trigger.</li> </ul>
* sets the maximum number of new files to be considered in every trigger.</li>
* <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
* be considered in every trigger.</li> </ul>
*
* You can find the JSON-specific options for reading JSON file stream in <a
* href="https://spark.apache.org/docs/latest/sql-data-sources-json.html#data-source-option">
Expand All @@ -179,7 +181,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L
* specify the schema explicitly using `schema`.
*
* You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit):
* sets the maximum number of new files to be considered in every trigger.</li> </ul>
* sets the maximum number of new files to be considered in every trigger.</li>
* <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
* be considered in every trigger.</li> </ul>
*
* You can find the CSV-specific options for reading CSV file stream in <a
* href="https://spark.apache.org/docs/latest/sql-data-sources-csv.html#data-source-option">
Expand All @@ -197,7 +201,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L
* specify the schema explicitly using `schema`.
*
* You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit):
* sets the maximum number of new files to be considered in every trigger.</li> </ul>
* sets the maximum number of new files to be considered in every trigger.</li>
* <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
* be considered in every trigger.</li> </ul>
*
* You can find the XML-specific options for reading XML file stream in <a
* href="https://spark.apache.org/docs/latest/sql-data-sources-xml.html#data-source-option">
Expand All @@ -211,7 +217,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L
* Loads a ORC file stream, returning the result as a `DataFrame`.
*
* You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit):
* sets the maximum number of new files to be considered in every trigger.</li> </ul>
* sets the maximum number of new files to be considered in every trigger.</li>
* <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
* be considered in every trigger.</li> </ul>
*
* ORC-specific option(s) for reading ORC file stream can be found in <a href=
* "https://spark.apache.org/docs/latest/sql-data-sources-orc.html#data-source-option"> Data
Expand All @@ -225,7 +233,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L
* Loads a Parquet file stream, returning the result as a `DataFrame`.
*
* You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit):
* sets the maximum number of new files to be considered in every trigger.</li> </ul>
* sets the maximum number of new files to be considered in every trigger.</li>
* <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
* be considered in every trigger.</li> </ul>
*
* Parquet-specific option(s) for reading Parquet file stream can be found in <a href=
* "https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#data-source-option"> Data
Expand Down Expand Up @@ -268,7 +278,9 @@ final class DataStreamReader private[sql] (sparkSession: SparkSession) extends L
* }}}
*
* You can set the following option(s): <ul> <li>`maxFilesPerTrigger` (default: no max limit):
* sets the maximum number of new files to be considered in every trigger.</li> </ul>
* sets the maximum number of new files to be considered in every trigger.</li>
* <li>`maxBytesPerTrigger` (default: no max limit): sets the maximum total size of new files to
* be considered in every trigger.</li> </ul>
*
* You can find the text-specific options for reading text files in <a
* href="https://spark.apache.org/docs/latest/sql-data-sources-text.html#data-source-option">
Expand Down
8 changes: 5 additions & 3 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,8 @@ Here are the details of all the sources in Spark.
<br/>
<code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max)
<br/>
<code>maxBytesPerTrigger</code>: maximum total size of new files to be considered in every trigger (default: no max). <code>maxBytesPerTrigger</code> and <code>maxFilesPerTrigger</code> can't both be set at the same time, only one of two must be chosen. Note that a stream always reads at least one file so it can make progress and not get stuck on a file larger than a given maximum.
<br/>
<code>latestFirst</code>: whether to process the latest new files first, useful when there is a large backlog of files (default: false)
<br/>
<code>fileNameOnly</code>: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
Expand All @@ -570,7 +572,7 @@ Here are the details of all the sources in Spark.
"s3n://a/b/dataset.txt"<br/>
"s3a://a/b/c/dataset.txt"
<br/>
<code>maxFileAge</code>: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If <code>latestFirst</code> is set to `true` and <code>maxFilesPerTrigger</code> is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
<code>maxFileAge</code>: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If <code>latestFirst</code> is set to `true` and <code>maxFilesPerTrigger</code> or <code>maxBytesPerTrigger</code> is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
<br/>
<code>cleanSource</code>: option to clean up completed files after processing.<br/>
Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
Expand Down Expand Up @@ -3272,8 +3274,8 @@ Here are the different kinds of triggers that are supported.
<td>
Similar to queries one-time micro-batch trigger, the query will process all the available data and then
stop on its own. The difference is that, it will process the data in (possibly) multiple micro-batches
based on the source options (e.g. <code>maxFilesPerTrigger</code> for file source), which will result
in better query scalability.
based on the source options (e.g. <code>maxFilesPerTrigger</code> or <code>maxBytesPerTrigger</code> for file
source), which will result in better query scalability.
<ul>
<li>This trigger provides a strong guarantee of processing: regardless of how many batches were
left over in previous run, it ensures all available data at the time of execution gets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ static ReadLimit minRows(long rows, long maxTriggerDelayMs) {

static ReadLimit maxFiles(int files) { return new ReadMaxFiles(files); }

static ReadLimit maxBytes(long bytes) { return new ReadMaxBytes(bytes); }

static ReadLimit allAvailable() { return ReadAllAvailable.INSTANCE; }

static ReadLimit compositeLimit(ReadLimit[] readLimits) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.read.streaming;

import org.apache.spark.annotation.Evolving;

/**
* Represents a {@link ReadLimit} where the {@link MicroBatchStream} should scan files which total
* size doesn't go beyond a given maximum total size. Always reads at least one file so a stream
* can make progress and not get stuck on a file larger than a given maximum.
*
* @see SupportsAdmissionControl#latestOffset(Offset, ReadLimit)
* @since 4.0.0
*/
@Evolving
public class ReadMaxBytes implements ReadLimit {
private long bytes;

ReadMaxBytes(long bytes) {
this.bytes = bytes;
}

/** Maximum total size of files to scan. */
public long maxBytes() { return this.bytes; }

@Override
public String toString() {
return "MaxBytes: " + maxBytes();
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ReadMaxBytes other = (ReadMaxBytes) o;
return other.maxBytes() == maxBytes();
}

@Override
public int hashCode() { return Long.hashCode(bytes); }
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,25 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
}
}

val maxBytesPerTrigger: Option[Long] = parameters.get("maxBytesPerTrigger").map { str =>
Try(str.toLong).toOption.filter(_ > 0).map(op =>
if (maxFilesPerTrigger.nonEmpty) {
throw new IllegalArgumentException(
"Options 'maxFilesPerTrigger' and 'maxBytesPerTrigger' " +
"can't be both set at the same time")
} else op
).getOrElse {
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'maxBytesPerTrigger', must be a positive integer")
}
}

/**
* Maximum age of a file that can be found in this directory, before it is ignored. For the
* first batch all files will be considered valid. If `latestFirst` is set to `true` and
* `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are
* valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details.
* `maxFilesPerTrigger` or `maxBytesPerTrigger` is set, then this parameter will be ignored,
* because old files that are valid, and should be processed, may be ignored. Please refer to
* SPARK-19813 for details.
*
* The max age is specified with respect to the timestamp of the latest file, and not the
* timestamp of the current system. That this means if the last file has timestamp 1000, and the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.streaming
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit._

import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
Expand All @@ -31,7 +32,7 @@ import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.connector.read.streaming
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxFiles, SupportsAdmissionControl, SupportsTriggerAvailableNow}
import org.apache.spark.sql.connector.read.streaming.{ReadAllAvailable, ReadLimit, ReadMaxBytes, ReadMaxFiles, SupportsAdmissionControl, SupportsTriggerAvailableNow}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -87,6 +88,9 @@ class FileStreamSource(
/** Maximum number of new files to be considered in each batch */
private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger

/** Maximum number of new bytes to be considered in each batch */
private val maxBytesPerBatch = sourceOptions.maxBytesPerTrigger

private val fileSortOrder = if (sourceOptions.latestFirst) {
logWarning(
"""'latestFirst' is true. New files will be processed first, which may affect the watermark
Expand All @@ -96,7 +100,8 @@ class FileStreamSource(
implicitly[Ordering[Long]]
}

private val maxFileAgeMs: Long = if (sourceOptions.latestFirst && maxFilesPerBatch.isDefined) {
private val maxFileAgeMs: Long = if (sourceOptions.latestFirst &&
(maxFilesPerBatch.isDefined || maxBytesPerBatch.isDefined)) {
Long.MaxValue
} else {
sourceOptions.maxFileAgeMs
Expand All @@ -113,16 +118,42 @@ class FileStreamSource(
// Visible for testing and debugging in production.
val seenFiles = new SeenFilesMap(maxFileAgeMs, fileNameOnly)

private var allFilesForTriggerAvailableNow: Seq[(SparkPath, Long)] = _
private var allFilesForTriggerAvailableNow: Seq[NewFileEntry] = _

metadataLog.restore().foreach { entry =>
seenFiles.add(entry.sparkPath, entry.timestamp)
}
seenFiles.purge()

logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")
logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, " +
s"maxBytesPerBatch = $maxBytesPerBatch, maxFileAgeMs = $maxFileAgeMs")

private var unreadFiles: Seq[NewFileEntry] = _

private var unreadFiles: Seq[(SparkPath, Long)] = _
/**
* Split files into a selected/unselected pair according to a total size threshold.
* Always puts the 1st element in a left split and keep adding it to a left split
* until reaches a specified threshold or [[Long.MaxValue]].
*/
private def takeFilesUntilMax(files: Seq[NewFileEntry], maxSize: Long)
: (FilesSplit, FilesSplit) = {
var lSize = BigInt(0)
var rSize = BigInt(0)
val lFiles = ArrayBuffer[NewFileEntry]()
val rFiles = ArrayBuffer[NewFileEntry]()
for (i <- files.indices) {
val file = files(i)
val newSize = lSize + file.size
if (i == 0 || rFiles.isEmpty && newSize <= Long.MaxValue && newSize <= maxSize) {
lSize += file.size
lFiles += file
} else {
rSize += file.size
rFiles += file
}
}
(FilesSplit(lFiles.toSeq, lSize), FilesSplit(rFiles.toSeq, rSize))
}

/**
* Returns the maximum offset that can be retrieved from the source.
Expand All @@ -143,7 +174,7 @@ class FileStreamSource(
fetchAllFiles()
}
allFiles.filter {
case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
case NewFileEntry(path, _, timestamp) => seenFiles.isNewFile(path, timestamp)
}
}

Expand All @@ -152,7 +183,7 @@ class FileStreamSource(
case files: ReadMaxFiles if !sourceOptions.latestFirst =>
// we can cache and reuse remaining fetched list of files in further batches
val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles())
if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_FILES_RATIO) {
if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_INPUT_RATIO) {
// Discard unselected files if the number of files are smaller than threshold.
// This is to avoid the case when the next batch would have too few files to read
// whereas there're new files available.
Expand All @@ -166,6 +197,25 @@ class FileStreamSource(
// implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
(newFiles.take(files.maxFiles()), null)

case files: ReadMaxBytes if !sourceOptions.latestFirst =>
// we can cache and reuse remaining fetched list of files in further batches
val (FilesSplit(bFiles, _), FilesSplit(usFiles, rSize)) =
takeFilesUntilMax(newFiles, files.maxBytes())
if (rSize.toDouble < (files.maxBytes() * DISCARD_UNSEEN_INPUT_RATIO)) {
// Discard unselected files if the total size of files is smaller than threshold.
// This is to avoid the case when the next batch would have too small of a size of
// files to read whereas there're new files available.
logTrace(s"Discarding ${usFiles.length} unread files as it's smaller than threshold.")
(bFiles, null)
} else {
(bFiles, usFiles)
}

case files: ReadMaxBytes =>
val (FilesSplit(bFiles, _), _) = takeFilesUntilMax(newFiles, files.maxBytes())
// implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
(bFiles, null)

case _: ReadAllAvailable => (newFiles, null)
}

Expand All @@ -178,9 +228,9 @@ class FileStreamSource(
logTrace(s"No unread file is available for further batches.")
}

batchFiles.foreach { file =>
seenFiles.add(file._1, file._2)
logDebug(s"New file: $file")
batchFiles.foreach { case NewFileEntry(p, _, timestamp) =>
seenFiles.add(p, timestamp)
logDebug(s"New file: $p")
}
val numPurged = seenFiles.purge()

Expand All @@ -196,7 +246,7 @@ class FileStreamSource(
if (batchFiles.nonEmpty) {
metadataLogCurrentOffset += 1

val fileEntries = batchFiles.map { case (p, timestamp) =>
val fileEntries = batchFiles.map { case NewFileEntry(p, _, timestamp) =>
FileEntry(path = p.urlEncoded, timestamp = timestamp, batchId = metadataLogCurrentOffset)
}.toArray
if (metadataLog.add(metadataLogCurrentOffset, fileEntries)) {
Expand All @@ -215,7 +265,9 @@ class FileStreamSource(
}

override def getDefaultReadLimit: ReadLimit = {
maxFilesPerBatch.map(ReadLimit.maxFiles).getOrElse(super.getDefaultReadLimit)
maxFilesPerBatch.map(ReadLimit.maxFiles).getOrElse(
maxBytesPerBatch.map(ReadLimit.maxBytes).getOrElse(super.getDefaultReadLimit)
)
}

/**
Expand Down Expand Up @@ -290,7 +342,7 @@ class FileStreamSource(
/**
* Returns a list of files found, sorted by their timestamp.
*/
private def fetchAllFiles(): Seq[(SparkPath, Long)] = {
private def fetchAllFiles(): Seq[NewFileEntry] = {
val startTime = System.nanoTime

var allFiles: Seq[FileStatus] = null
Expand Down Expand Up @@ -322,7 +374,7 @@ class FileStreamSource(
}

val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status =>
(SparkPath.fromFileStatus(status), status.getModificationTime)
NewFileEntry(SparkPath.fromFileStatus(status), status.getLen, status.getModificationTime)
}
val endTime = System.nanoTime
val listingTimeMs = NANOSECONDS.toMillis(endTime - startTime)
Expand Down Expand Up @@ -369,7 +421,7 @@ object FileStreamSource {
/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long

val DISCARD_UNSEEN_FILES_RATIO = 0.2
val DISCARD_UNSEEN_INPUT_RATIO = 0.2
val MAX_CACHED_UNSEEN_FILES = 10000

case class FileEntry(
Expand All @@ -379,6 +431,11 @@ object FileStreamSource {
def sparkPath: SparkPath = SparkPath.fromUrlString(path)
}

/** Newly fetched files metadata holder. */
private case class NewFileEntry(path: SparkPath, size: Long, timestamp: Long)

private case class FilesSplit(files: Seq[NewFileEntry], size: BigInt)

/**
* A custom hash map used to track the list of files seen. This map is not thread-safe.
*
Expand Down
Loading

0 comments on commit 3f5faaa

Please sign in to comment.