From cc5d11f2b10d22e1c6485823c38c198c93ad8a04 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Thu, 21 Sep 2023 14:22:08 +0900 Subject: [PATCH 01/20] develop DeduplicateDocuments filter --- .../pipeline/all_duplicate_paragraphs.conf | 1 + .../lib/filters/DeduplicateDocuments.scala | 39 +++++++++++++++++++ 2 files changed, 40 insertions(+) create mode 100644 lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala diff --git a/lib/src/main/resources/pipeline/all_duplicate_paragraphs.conf b/lib/src/main/resources/pipeline/all_duplicate_paragraphs.conf index 4d8aa67..6a0d1f3 100644 --- a/lib/src/main/resources/pipeline/all_duplicate_paragraphs.conf +++ b/lib/src/main/resources/pipeline/all_duplicate_paragraphs.conf @@ -1,3 +1,4 @@ filters: [ + {"class": "DuplicateDocuments"}, {"class": "DuplicateParagraphs", "limit": 2} ] \ No newline at end of file diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala new file mode 100644 index 0000000..85f68ad --- /dev/null +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -0,0 +1,39 @@ +package com.worksap.nlp.uzushio.lib.filters + +import com.worksap.nlp.uzushio.lib.cleaning.Document +import com.worksap.nlp.uzushio.lib.filters.base.HighLowDocFilter +import com.worksap.nlp.uzushio.lib.utils.MathUtil + + +class DeduplicateDocuments( + override val low: Float = 0.0f, + override val high: Float = 1.0f +) extends HighLowDocFilter { + + override def checkDocument(doc: Document): Document = { + val iter = doc.aliveParagraphs + + var lengthNearFreqOverOne = 0 + var totalLength = 0 + + while (iter.hasNext) { + val paragraph = iter.next() + val text = paragraph.text + val textLength = text.length() + val nearFreq = paragraph.nearFreq + + totalLength += textLength + + if (nearFreq > 1) { + lengthNearFreqOverOne += textLength + } + } + + val nearDuplicateTextRatio = MathUtil.ratio(lengthNearFreqOverOne, totalLength) + + val thresholdProb = doc.randomDouble + if (nearDuplicateTextRatio >= thresholdProb) { + doc.copy(remove = this) + } else doc + } +} \ No newline at end of file From ef71563eb086ccdff0c224bce363a6b2a5ceffac Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Thu, 21 Sep 2023 19:11:36 +0900 Subject: [PATCH 02/20] Refactoring DedupicateDocuments for test --- .../lib/filters/DeduplicateDocuments.scala | 41 ++++++++++++++----- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala index 85f68ad..f77b21b 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -1,17 +1,31 @@ package com.worksap.nlp.uzushio.lib.filters +import com.worksap.nlp.uzushio.lib.stats.NgramHashExtractor import com.worksap.nlp.uzushio.lib.cleaning.Document -import com.worksap.nlp.uzushio.lib.filters.base.HighLowDocFilter +import com.worksap.nlp.uzushio.lib.filters.base.DocFilter import com.worksap.nlp.uzushio.lib.utils.MathUtil +trait DocumentRandomGeneratorBase { + def randomSeed(docId: String): Long + + def randomDouble(docId: String): Double +} + + +class DocumentRandomGenerator extends DocumentRandomGeneratorBase { + def randomSeed(docId: String): Long = NgramHashExtractor.hashString(docId) + + def randomDouble(docId: String): Double = MathUtil.asRandomDouble(randomSeed(docId)) +} + + class DeduplicateDocuments( - override val low: Float = 0.0f, - override val high: Float = 1.0f -) extends HighLowDocFilter { + val randomGenerator: DocumentRandomGeneratorBase = new DocumentRandomGenerator +) extends DocFilter { - override def checkDocument(doc: Document): Document = { - val iter = doc.aliveParagraphs + def computeNearDuplicateTextRatio(doc: Document): Float = { + val iter = doc.aliveParagraphs var lengthNearFreqOverOne = 0 var totalLength = 0 @@ -29,10 +43,17 @@ class DeduplicateDocuments( } } - val nearDuplicateTextRatio = MathUtil.ratio(lengthNearFreqOverOne, totalLength) - - val thresholdProb = doc.randomDouble - if (nearDuplicateTextRatio >= thresholdProb) { + MathUtil.ratio(lengthNearFreqOverOne, totalLength) + } + + def shouldRemoveDocument(doc: Document) = { + val nearDuplicateTextRatio = computeNearDuplicateTextRatio(doc) + val thresholdProb = randomGenerator.randomDouble(doc.docId) + nearDuplicateTextRatio >= thresholdProb + } + + override def checkDocument(doc: Document): Document = { + if (shouldRemoveDocument(doc)) { doc.copy(remove = this) } else doc } From 3fe58d25142b6d7e76a43b524f6b1c804bce1735 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Fri, 22 Sep 2023 13:56:13 +0900 Subject: [PATCH 03/20] Update logic computing duplication ratio --- .../lib/filters/DeduplicateDocuments.scala | 16 ++++++++-------- .../worksap/nlp/uzushio/lib/utils/MathUtil.java | 7 +++++++ 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala index f77b21b..f85ada4 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -4,6 +4,7 @@ import com.worksap.nlp.uzushio.lib.stats.NgramHashExtractor import com.worksap.nlp.uzushio.lib.cleaning.Document import com.worksap.nlp.uzushio.lib.filters.base.DocFilter import com.worksap.nlp.uzushio.lib.utils.MathUtil +import scala.math._ trait DocumentRandomGeneratorBase { @@ -21,29 +22,28 @@ class DocumentRandomGenerator extends DocumentRandomGeneratorBase { class DeduplicateDocuments( + val baseNumFreq: Int = 100, val randomGenerator: DocumentRandomGeneratorBase = new DocumentRandomGenerator ) extends DocFilter { def computeNearDuplicateTextRatio(doc: Document): Float = { val iter = doc.aliveParagraphs - var lengthNearFreqOverOne = 0 - var totalLength = 0 + var totalLengthWeightedNearFreq = 0.0 + var totalLength = 0.0 while (iter.hasNext) { val paragraph = iter.next() val text = paragraph.text val textLength = text.length() - val nearFreq = paragraph.nearFreq + val nearFreq = if (paragraph.nearFreq < baseNumFreq) paragraph.nearFreq else baseNumFreq + val weight = log(nearFreq) / log(baseNumFreq) totalLength += textLength - - if (nearFreq > 1) { - lengthNearFreqOverOne += textLength - } + totalLengthWeightedNearFreq += (textLength * weight) } - MathUtil.ratio(lengthNearFreqOverOne, totalLength) + MathUtil.ratio(totalLengthWeightedNearFreq.toFloat, totalLength.toFloat) } def shouldRemoveDocument(doc: Document) = { diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/utils/MathUtil.java b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/utils/MathUtil.java index 9037831..5f261a6 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/utils/MathUtil.java +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/utils/MathUtil.java @@ -95,6 +95,13 @@ public static int matchingBits(long x, long y) { return Long.bitCount(~(x ^ y)); } + public static float ratio(float sum, float total) { + if (sum == 0 || total == 0) { + return 0.0f; + } + return sum / total; + } + public static float ratio(int count, int total) { if (count == 0 || total == 0) { return 0.0f; From b91ffb5d317603c0b2740a14c5cd11fac490b521 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Fri, 22 Sep 2023 22:33:57 +0900 Subject: [PATCH 04/20] Apply scalafmt --- .../uzushio/lib/filters/DeduplicateDocuments.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala index f85ada4..d064f41 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -6,28 +6,25 @@ import com.worksap.nlp.uzushio.lib.filters.base.DocFilter import com.worksap.nlp.uzushio.lib.utils.MathUtil import scala.math._ - trait DocumentRandomGeneratorBase { def randomSeed(docId: String): Long def randomDouble(docId: String): Double } - class DocumentRandomGenerator extends DocumentRandomGeneratorBase { def randomSeed(docId: String): Long = NgramHashExtractor.hashString(docId) def randomDouble(docId: String): Double = MathUtil.asRandomDouble(randomSeed(docId)) } - class DeduplicateDocuments( - val baseNumFreq: Int = 100, - val randomGenerator: DocumentRandomGeneratorBase = new DocumentRandomGenerator + val baseNumFreq: Int = 100, + val randomGenerator: DocumentRandomGeneratorBase = new DocumentRandomGenerator ) extends DocFilter { def computeNearDuplicateTextRatio(doc: Document): Float = { - val iter = doc.aliveParagraphs + val iter = doc.aliveParagraphs var totalLengthWeightedNearFreq = 0.0 var totalLength = 0.0 @@ -57,4 +54,4 @@ class DeduplicateDocuments( doc.copy(remove = this) } else doc } -} \ No newline at end of file +} From 4500c66d7aead9bf691f23b58918f6740113d457 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Sat, 23 Sep 2023 22:22:46 +0900 Subject: [PATCH 05/20] modify oneline implementation --- .../uzushio/lib/filters/DeduplicateDocuments.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala index d064f41..2c78883 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -6,13 +6,14 @@ import com.worksap.nlp.uzushio.lib.filters.base.DocFilter import com.worksap.nlp.uzushio.lib.utils.MathUtil import scala.math._ -trait DocumentRandomGeneratorBase { +trait RandomGeneratorFromStringBase { def randomSeed(docId: String): Long def randomDouble(docId: String): Double } -class DocumentRandomGenerator extends DocumentRandomGeneratorBase { +// An object in arguments of DocFilter on Spark needs to mixin Serializable. +object RandomGeneratorFromString extends RandomGeneratorFromStringBase with Serializable { def randomSeed(docId: String): Long = NgramHashExtractor.hashString(docId) def randomDouble(docId: String): Double = MathUtil.asRandomDouble(randomSeed(docId)) @@ -20,7 +21,7 @@ class DocumentRandomGenerator extends DocumentRandomGeneratorBase { class DeduplicateDocuments( val baseNumFreq: Int = 100, - val randomGenerator: DocumentRandomGeneratorBase = new DocumentRandomGenerator + val randomGenerator: RandomGeneratorFromStringBase = RandomGeneratorFromString ) extends DocFilter { def computeNearDuplicateTextRatio(doc: Document): Float = { @@ -45,13 +46,12 @@ class DeduplicateDocuments( def shouldRemoveDocument(doc: Document) = { val nearDuplicateTextRatio = computeNearDuplicateTextRatio(doc) + val thresholdProb = randomGenerator.randomDouble(doc.docId) nearDuplicateTextRatio >= thresholdProb } override def checkDocument(doc: Document): Document = { - if (shouldRemoveDocument(doc)) { - doc.copy(remove = this) - } else doc + doc.removeWhen(shouldRemoveDocument(doc), this) } } From 283c2f0b22eb84f01f7c0eb2da244cc6c49480b4 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Sat, 23 Sep 2023 22:26:44 +0900 Subject: [PATCH 06/20] modify typo and operators --- .../com/worksap/nlp/uzushio/lib/cleaning/Pipeline.scala | 8 ++++---- .../nlp/uzushio/lib/runners/DeduplicateParagraphs.scala | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/cleaning/Pipeline.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/cleaning/Pipeline.scala index dda6bd0..0594668 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/cleaning/Pipeline.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/cleaning/Pipeline.scala @@ -88,13 +88,13 @@ class PerParagraphFilter(val filter: ParagraphFilter) extends DocFilter { .copy(paragraphs = doc.paragraphs.map(filter.checkParagraph)) } -final class Pipeline(filers: Array[DocFilter]) extends Serializable { +final class Pipeline(filters: Array[DocFilter]) extends Serializable { def applyFilters(doc: Document): Document = { var i = 0 - val len = filers.length + val len = filters.length var state = doc - while (i < len && state.remove != null) { - val f = filers(i) + while (i < len && state.remove == null) { + val f = filters(i) state = f.checkDocument(state) i += 1 } diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala index ac027c8..efd5470 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala @@ -694,7 +694,8 @@ object DeduplicateParagraphs { ): String = { val doc = Document(parts) val filtered = args.pipeline.applyFilters(doc) - filtered.copy(paragraphs = doc.paragraphs.filter(_.remove != null)).render() + println(filtered.paragraphs.map(_.remove)) + filtered.copy(paragraphs = doc.paragraphs.filter(_.remove == null)).render() } // noinspection TypeAnnotation,ScalaWeakerAccess From 9f7a7933aabd842d1b604d59b7644451b5e36ecb Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Sat, 23 Sep 2023 23:49:12 +0900 Subject: [PATCH 07/20] add filtering document logic in processDocumentParts --- .../nlp/uzushio/lib/runners/DeduplicateParagraphs.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala index efd5470..69ab027 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala @@ -694,7 +694,9 @@ object DeduplicateParagraphs { ): String = { val doc = Document(parts) val filtered = args.pipeline.applyFilters(doc) - println(filtered.paragraphs.map(_.remove)) + if (filtered.remove == null) { + return filtered.copy(IndexedSeq()).render() + } filtered.copy(paragraphs = doc.paragraphs.filter(_.remove == null)).render() } From c96e5d681d9734b8670db3feb216d34fce7c1364 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Mon, 25 Sep 2023 14:59:32 +0900 Subject: [PATCH 08/20] fix operator and variable --- .../nlp/uzushio/lib/filters/DeduplicateDocuments.scala | 1 - .../nlp/uzushio/lib/runners/DeduplicateParagraphs.scala | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala index 2c78883..ccd2685 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -46,7 +46,6 @@ class DeduplicateDocuments( def shouldRemoveDocument(doc: Document) = { val nearDuplicateTextRatio = computeNearDuplicateTextRatio(doc) - val thresholdProb = randomGenerator.randomDouble(doc.docId) nearDuplicateTextRatio >= thresholdProb } diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala index 69ab027..e9ca6b2 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala @@ -694,10 +694,10 @@ object DeduplicateParagraphs { ): String = { val doc = Document(parts) val filtered = args.pipeline.applyFilters(doc) - if (filtered.remove == null) { + if (filtered.remove != null) { return filtered.copy(IndexedSeq()).render() } - filtered.copy(paragraphs = doc.paragraphs.filter(_.remove == null)).render() + filtered.copy(paragraphs = filtered.paragraphs.filter(_.remove == null)).render() } // noinspection TypeAnnotation,ScalaWeakerAccess From e967f3739c71cabcc3b9398c3298dcf3b402a5d7 Mon Sep 17 00:00:00 2001 From: Arseny Tolmachev Date: Fri, 22 Sep 2023 11:53:15 +0900 Subject: [PATCH 09/20] sniff encodings more leniently --- .../nlp/uzushio/lib/lang/LangTagSniffer.scala | 2 +- .../uzushio/lib/warc/WarcEntryParser.scala | 29 +++++++------------ 2 files changed, 11 insertions(+), 20 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/lang/LangTagSniffer.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/lang/LangTagSniffer.scala index 2a8334e..d3f27be 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/lang/LangTagSniffer.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/lang/LangTagSniffer.scala @@ -56,7 +56,7 @@ class LangTagSniffer() { object LangTagSniffer { private val metaRegex = "]*>".r - private val charsetRegex = Pattern.compile("charset=([^\"' >]+)", Pattern.CASE_INSENSITIVE) + private val charsetRegex = Pattern.compile("charset=([^\"' ;,/>]+)", Pattern.CASE_INSENSITIVE) def extractCharset(tag: String): String = { val matcher = charsetRegex.matcher(tag) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/warc/WarcEntryParser.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/warc/WarcEntryParser.scala index afaeaa2..efc9e1d 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/warc/WarcEntryParser.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/warc/WarcEntryParser.scala @@ -1,18 +1,10 @@ package com.worksap.nlp.uzushio.lib.warc import com.worksap.nlp.uzushio.lib.html.{AllTagMapper, ParagraphExtractor, ParseAbortException} -import com.worksap.nlp.uzushio.lib.lang.{ - EstimationFailure, - LangEstimation, - LangTagSniffer, - ProbableLanguage -} +import com.worksap.nlp.uzushio.lib.lang.{EstimationFailure, LangEstimation, LangTagSniffer, ProbableLanguage} import com.worksap.nlp.uzushio.lib.warc.WarcEntryParser.{logger, resolveEarliestDate} -import org.apache.hc.core5.http.impl.nio.{ - DefaultHttpResponseFactory, - DefaultHttpResponseParser, - SessionBufferAccess -} +import org.apache.commons.lang3.StringUtils +import org.apache.hc.core5.http.impl.nio.{DefaultHttpResponseFactory, DefaultHttpResponseParser, SessionBufferAccess} import org.apache.hc.core5.http.{HttpException, HttpMessage, MessageHeaders} import org.apache.tika.detect.EncodingDetector import org.apache.tika.exception.TikaException @@ -24,12 +16,7 @@ import org.mozilla.universalchardet.UniversalDetector import org.slf4j.LoggerFactory import java.io.{ByteArrayInputStream, IOException, InputStream} -import java.nio.charset.{ - Charset, - IllegalCharsetNameException, - StandardCharsets, - UnsupportedCharsetException -} +import java.nio.charset.{Charset, IllegalCharsetNameException, StandardCharsets, UnsupportedCharsetException} import java.time.format.{DateTimeFormatter, DateTimeParseException} import java.time.{LocalDateTime, ZoneId, ZonedDateTime} import java.util.{Locale, UUID} @@ -90,8 +77,9 @@ class WarcEntryParser( } private def lookupCharset(name: String): Option[Charset] = { + val cleanName = name.stripSuffix(";") try { - Some(Charset.forName(name)) + Some(Charset.forName(cleanName)) } catch { case _: IllegalCharsetNameException => None case _: UnsupportedCharsetException => None @@ -109,7 +97,8 @@ class WarcEntryParser( private val win31j = Charset.forName("windows-31j") private def lookupNormalizedCharset(name: String) = { - val charsetName = name.toLowerCase(Locale.ROOT) + val cleanName = StringUtils.strip(name, " \n\r\t;,") + val charsetName = cleanName.toLowerCase(Locale.ROOT) charsetName match { case "" => None case "utf-8" | "utf8" => Some(StandardCharsets.UTF_8) @@ -220,6 +209,8 @@ class WarcEntryParser( case e: StringIndexOutOfBoundsException => reportSkippedDoc(result, record, e) case e: NullPointerException => reportSkippedDoc(result, record, e) case e: ParseAbortException => reportSkippedDoc(result, record, e) + case e: IllegalCharsetNameException => reportSkippedDoc(result, record, e) // can be thrown in malformed svgs + case e: UnsupportedCharsetException => reportSkippedDoc(result, record, e) } result } From d491e10ec07c7b9fe217ae87a7591fd024883bd1 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Sat, 23 Sep 2023 22:22:46 +0900 Subject: [PATCH 10/20] modify oneline implementation --- .../uzushio/lib/filters/DeduplicateDocuments.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala index d064f41..2c78883 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -6,13 +6,14 @@ import com.worksap.nlp.uzushio.lib.filters.base.DocFilter import com.worksap.nlp.uzushio.lib.utils.MathUtil import scala.math._ -trait DocumentRandomGeneratorBase { +trait RandomGeneratorFromStringBase { def randomSeed(docId: String): Long def randomDouble(docId: String): Double } -class DocumentRandomGenerator extends DocumentRandomGeneratorBase { +// An object in arguments of DocFilter on Spark needs to mixin Serializable. +object RandomGeneratorFromString extends RandomGeneratorFromStringBase with Serializable { def randomSeed(docId: String): Long = NgramHashExtractor.hashString(docId) def randomDouble(docId: String): Double = MathUtil.asRandomDouble(randomSeed(docId)) @@ -20,7 +21,7 @@ class DocumentRandomGenerator extends DocumentRandomGeneratorBase { class DeduplicateDocuments( val baseNumFreq: Int = 100, - val randomGenerator: DocumentRandomGeneratorBase = new DocumentRandomGenerator + val randomGenerator: RandomGeneratorFromStringBase = RandomGeneratorFromString ) extends DocFilter { def computeNearDuplicateTextRatio(doc: Document): Float = { @@ -45,13 +46,12 @@ class DeduplicateDocuments( def shouldRemoveDocument(doc: Document) = { val nearDuplicateTextRatio = computeNearDuplicateTextRatio(doc) + val thresholdProb = randomGenerator.randomDouble(doc.docId) nearDuplicateTextRatio >= thresholdProb } override def checkDocument(doc: Document): Document = { - if (shouldRemoveDocument(doc)) { - doc.copy(remove = this) - } else doc + doc.removeWhen(shouldRemoveDocument(doc), this) } } From e4d21b2cfea0a0ba3f305ddd6375fff0a823de7f Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Sat, 23 Sep 2023 22:26:44 +0900 Subject: [PATCH 11/20] modify typo and operators --- .../com/worksap/nlp/uzushio/lib/cleaning/Pipeline.scala | 8 ++++---- .../nlp/uzushio/lib/runners/DeduplicateParagraphs.scala | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/cleaning/Pipeline.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/cleaning/Pipeline.scala index dda6bd0..0594668 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/cleaning/Pipeline.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/cleaning/Pipeline.scala @@ -88,13 +88,13 @@ class PerParagraphFilter(val filter: ParagraphFilter) extends DocFilter { .copy(paragraphs = doc.paragraphs.map(filter.checkParagraph)) } -final class Pipeline(filers: Array[DocFilter]) extends Serializable { +final class Pipeline(filters: Array[DocFilter]) extends Serializable { def applyFilters(doc: Document): Document = { var i = 0 - val len = filers.length + val len = filters.length var state = doc - while (i < len && state.remove != null) { - val f = filers(i) + while (i < len && state.remove == null) { + val f = filters(i) state = f.checkDocument(state) i += 1 } diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala index ac027c8..efd5470 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala @@ -694,7 +694,8 @@ object DeduplicateParagraphs { ): String = { val doc = Document(parts) val filtered = args.pipeline.applyFilters(doc) - filtered.copy(paragraphs = doc.paragraphs.filter(_.remove != null)).render() + println(filtered.paragraphs.map(_.remove)) + filtered.copy(paragraphs = doc.paragraphs.filter(_.remove == null)).render() } // noinspection TypeAnnotation,ScalaWeakerAccess From 511cb5d1a065121a49bfa3a22e7b1d6da70e49c2 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Sat, 23 Sep 2023 23:49:12 +0900 Subject: [PATCH 12/20] add filtering document logic in processDocumentParts --- .../nlp/uzushio/lib/runners/DeduplicateParagraphs.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala index efd5470..69ab027 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala @@ -694,7 +694,9 @@ object DeduplicateParagraphs { ): String = { val doc = Document(parts) val filtered = args.pipeline.applyFilters(doc) - println(filtered.paragraphs.map(_.remove)) + if (filtered.remove == null) { + return filtered.copy(IndexedSeq()).render() + } filtered.copy(paragraphs = doc.paragraphs.filter(_.remove == null)).render() } From 9d5c2f53378b0d698d05de1013bdd7867a1bd96e Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Mon, 25 Sep 2023 14:59:32 +0900 Subject: [PATCH 13/20] fix operator and variable --- .../nlp/uzushio/lib/filters/DeduplicateDocuments.scala | 1 - .../nlp/uzushio/lib/runners/DeduplicateParagraphs.scala | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala index 2c78883..ccd2685 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -46,7 +46,6 @@ class DeduplicateDocuments( def shouldRemoveDocument(doc: Document) = { val nearDuplicateTextRatio = computeNearDuplicateTextRatio(doc) - val thresholdProb = randomGenerator.randomDouble(doc.docId) nearDuplicateTextRatio >= thresholdProb } diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala index 69ab027..e9ca6b2 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala @@ -694,10 +694,10 @@ object DeduplicateParagraphs { ): String = { val doc = Document(parts) val filtered = args.pipeline.applyFilters(doc) - if (filtered.remove == null) { + if (filtered.remove != null) { return filtered.copy(IndexedSeq()).render() } - filtered.copy(paragraphs = doc.paragraphs.filter(_.remove == null)).render() + filtered.copy(paragraphs = filtered.paragraphs.filter(_.remove == null)).render() } // noinspection TypeAnnotation,ScalaWeakerAccess From 1bc277c3f20590939f5108ed8b4e30e0ce4357aa Mon Sep 17 00:00:00 2001 From: Arseny Tolmachev Date: Mon, 25 Sep 2023 10:38:24 +0900 Subject: [PATCH 14/20] add debug tool for displaying metrics with dedup stats --- .../lib/runners/DedupFilterStatistics.scala | 117 ++++++++++++++++++ .../lib/runners/DeduplicateParagraphs.scala | 91 +++++++------- 2 files changed, 165 insertions(+), 43 deletions(-) create mode 100644 lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala new file mode 100644 index 0000000..fe8999b --- /dev/null +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala @@ -0,0 +1,117 @@ +package com.worksap.nlp.uzushio.lib.runners + +import com.worksap.nlp.uzushio.lib.cleaning.Document +import com.worksap.nlp.uzushio.lib.utils.Resources.AutoClosableResource +import org.apache.spark.sql.expressions.UserDefinedFunction +import org.apache.spark.sql.functions.{collect_list, octet_length, udf} +import org.apache.spark.sql.{SaveMode, SparkSession} +import org.rogach.scallop.ScallopConf + +import scala.reflect.runtime.universe.TypeTag + +object DedupFilterStatistics { + + def process(spark: SparkSession, args: Args): Unit = { + import spark.implicits._ + + val rawData = spark.read.parquet(args.input(): _*) + + val limited = args.limit.toOption match { + case None => rawData + case Some(lim) => + val cnt = rawData.count() + val ratio = lim.toDouble / cnt + rawData.sample(withReplacement = false, ratio, 0xdeadbeefL) + } + + val stats = spark.read.parquet(args.stats()) + + val textOnly = limited.select("text", "docId").filter(octet_length($"text") > 2) + + val docsWithStats = DeduplicateParagraphs + .prepareParagraphsForFiltering(textOnly, stats, debug = false) + + val assembledDocs = docsWithStats.groupBy("docId").agg( + collect_list("text") as "text", + collect_list("pos") as "pos", + collect_list("exactFreq") as "exactFreq", + collect_list("nearFreq") as "nearFreq" + ) + + val metric = extractFilteredMetric(spark, args) + + val withValues = assembledDocs.select( + metric($"docId", $"text", $"pos", $"exactFreq", $"nearFreq") as "res" + ).select(// make columns in the same order as FilterStatistics + $"res._2" as "value", + $"res._1" as "text" + ) + + withValues.persist().repartitionByRange(args.partitions(), withValues.col("value")) + .sortWithinPartitions("value").write.option("escape", "\"").mode(SaveMode.Overwrite) + .csv(args.output()) + } + + def ratioUdfConstructor[T: TypeTag](sample: Double)(extractor: Document => Float): UserDefinedFunction = { + udf { + ( + docId: String, + text: Array[String], + pos: Array[Int], + exactFreq: Array[Int], + nearFreq: Array[Int] + ) => + val docParts = DeduplicateParagraphs.collectDocParts(text, pos, exactFreq, nearFreq) + val doc = Document(paragraphs = docParts, docId = docId) + val ratio = extractor(doc) + val docData = if (doc.randomDouble < sample) { + docParts.map(_.text.replace("\n", "
")).mkString("

") + } else "" + (docData, ratio) + } + } + + def extractFilteredMetric( + sparkSession: SparkSession, + args: Args + ): UserDefinedFunction = { + val ftype = args.filter() + val arg = args.arg() + + // curry makeUdf with sampling probability + val udfMaker = ratioUdfConstructor(args.examples()) _ + + ftype match { + case "max-near-freq" => + udfMaker(doc => doc.aliveParagraphs.map(_.nearFreq).foldRight(0)(_.max(_))) + case "min-near-freq" => + udfMaker(doc => doc.aliveParagraphs.map(_.nearFreq).foldRight(0)(_.min(_))) + case _ => throw new IllegalArgumentException(s"unknown metric $ftype") + } + + } + + class Args(args: Seq[String]) extends ScallopConf(args) { + val input = opt[List[String]](required = true) + val stats = opt[String](required = true) + val output = opt[String](required = true) + val filter = opt[String](required = true) + val limit = opt[Int]() + val arg = opt[String](default = Some("")) + val arg2 = opt[String]() + val partitions = opt[Int](default = Some(10)) + val examples = opt[Double](default = Some(0.001)) + val master = opt[String]() + verify() + } + + def main(args: Array[String]): Unit = { + val opts = new Args(args) + val conf = SparkSession.builder() + opts.master.foreach(m => conf.master(m)) + + conf.getOrCreate().use { spark => + process(spark, opts) + } + } +} diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala index e9ca6b2..3fa4b50 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DeduplicateParagraphs.scala @@ -398,7 +398,7 @@ class DeduplicateParagraphs( return } - val paragraphsWithFreqs = prepareParagraphsForFiltering(rawData, stats) + val paragraphsWithFreqs = DeduplicateParagraphs.prepareParagraphsForFiltering(rawData, stats, args.debug) if (args.hasStage("saveReassembled")) { saveReassembled(paragraphsWithFreqs) @@ -538,48 +538,6 @@ class DeduplicateParagraphs( ) } - private def prepareParagraphsForFiltering( - raw: DataFrame, - stats: DataFrame - ): DataFrame = { - - val explodeCols = raw.columns.map { - case "text" => posexplode(split(raw.col("text"), "\n\n")).as(Seq("pos", "text")) - case col => raw.col(col) - } - - val exploded = raw.select(explodeCols: _*) - - val cleanParUdf = udf((s: String) => Paragraphs.extractCleanParagraph(s)) - - val cookedDocs = exploded.withColumn("cleanText", cleanParUdf($"text")) - .withColumn("parHash", xxhash64($"cleanText")) - - val joined = cookedDocs.join(stats, $"parHash" === $"hash", "left") - - val basicCols = (if (args.debug) { - joined.columns.filter { - case "parHash" => false - case "exactFreq" | "nearFreq" => false - case _ => true - } - } else { - joined.columns.filter { - case "hash" | "reprHash" | "parHash" | "cleanText" => false - case "exactFreq" | "nearFreq" => false - case _ => true - } - }).map(joined.col) - - val computedCols = Seq( // common newly computed columns - when($"exactFreq".isNotNull, $"exactFreq").otherwise(lit(1)).as("exactFreq"), - when($"nearFreq".isNotNull, $"nearFreq").otherwise(lit(1)).as("nearFreq") - ) - - joined.select( - basicCols ++ computedCols: _* - ) - } // compile full documents from paragraphs // paragraphs are shuffled because of join with freqs, @@ -651,6 +609,53 @@ class DeduplicateParagraphs( object DeduplicateParagraphs { + def prepareParagraphsForFiltering( + raw: DataFrame, + stats: DataFrame, + debug: Boolean + ): DataFrame = { + + import raw.sparkSession.implicits._ + + val explodeCols = raw.columns.map { + case "text" => posexplode(split(raw.col("text"), "\n\n")).as(Seq("pos", "text")) + case col => raw.col(col) + } + + val exploded = raw.select(explodeCols: _*) + + val cleanParUdf = udf((s: String) => Paragraphs.extractCleanParagraph(s)) + + val cookedDocs = exploded.withColumn("cleanText", cleanParUdf($"text")) + .withColumn("parHash", xxhash64($"cleanText")) + + val joined = cookedDocs.join(stats, $"parHash" === $"hash", "left") + + val basicCols = (if (debug) { + joined.columns.filter { + case "parHash" => false + case "exactFreq" | "nearFreq" => false + case _ => true + } + } else { + joined.columns.filter { + case "hash" | "reprHash" | "parHash" | "cleanText" => false + case "exactFreq" | "nearFreq" => false + case _ => true + } + }).map(joined.col) + + val computedCols = Seq( // common newly computed columns + when($"exactFreq".isNotNull, $"exactFreq").otherwise(lit(1)).as("exactFreq"), + when($"nearFreq".isNotNull, $"nearFreq").otherwise(lit(1)).as("nearFreq") + ) + + joined.select( + basicCols ++ computedCols: _* + ) + } + + def collectDocParts( text: Array[String], pos: Array[Int], From 6bc1e71458007c95ec2d1b6e68c858d4f5845044 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Mon, 25 Sep 2023 22:27:19 +0900 Subject: [PATCH 15/20] add Gaussian Random Value Generator, and refactoring RandomGeneratorFromString --- .../lib/filters/DeduplicateDocuments.scala | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala index ccd2685..cfb6617 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -5,23 +5,36 @@ import com.worksap.nlp.uzushio.lib.cleaning.Document import com.worksap.nlp.uzushio.lib.filters.base.DocFilter import com.worksap.nlp.uzushio.lib.utils.MathUtil import scala.math._ +import scala.util.Random -trait RandomGeneratorFromStringBase { - def randomSeed(docId: String): Long - def randomDouble(docId: String): Double +trait RandomGeneratorFromStringBase { + def generateRandom(docId: String): Double } // An object in arguments of DocFilter on Spark needs to mixin Serializable. object RandomGeneratorFromString extends RandomGeneratorFromStringBase with Serializable { - def randomSeed(docId: String): Long = NgramHashExtractor.hashString(docId) + def generateRandom(docId: String): Double = { + val seed = NgramHashExtractor.hashString(docId) + MathUtil.asRandomDouble(seed) + } +} - def randomDouble(docId: String): Double = MathUtil.asRandomDouble(randomSeed(docId)) +class GaussianRandomGeneratorFromString( + val mu: Double = 0.1, + val sd: Double = 0.1 +) extends RandomGeneratorFromStringBase with Serializable { + def generateRandom(docId: String): Double = { + val seed = NgramHashExtractor.hashString(docId) + Random.setSeed(seed) + + Random.nextGaussian() * mu + sd + } } class DeduplicateDocuments( - val baseNumFreq: Int = 100, - val randomGenerator: RandomGeneratorFromStringBase = RandomGeneratorFromString + val baseNumFreq: Int = 10, + val randomGenerator: RandomGeneratorFromStringBase = new GaussianRandomGeneratorFromString ) extends DocFilter { def computeNearDuplicateTextRatio(doc: Document): Float = { @@ -46,7 +59,9 @@ class DeduplicateDocuments( def shouldRemoveDocument(doc: Document) = { val nearDuplicateTextRatio = computeNearDuplicateTextRatio(doc) - val thresholdProb = randomGenerator.randomDouble(doc.docId) + val thresholdProb = randomGenerator.generateRandom(doc.render()) + + println(("ratio", nearDuplicateTextRatio, thresholdProb, doc.docId, doc.paragraphs.map(x => x.nearFreq))) nearDuplicateTextRatio >= thresholdProb } From 87d037bee8306b728ccbd76347f0618879b903ae Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Tue, 26 Sep 2023 11:14:04 +0900 Subject: [PATCH 16/20] add tests for DeduplicateDocuments --- .../filters/DeduplicateDocumentsSpec.scala | 64 +++++++++++++++++++ .../nlp/uzushio/lib/filters/package.scala | 7 ++ 2 files changed, 71 insertions(+) create mode 100644 lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocumentsSpec.scala diff --git a/lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocumentsSpec.scala b/lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocumentsSpec.scala new file mode 100644 index 0000000..e993957 --- /dev/null +++ b/lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocumentsSpec.scala @@ -0,0 +1,64 @@ +package com.worksap.nlp.uzushio.lib.filters + +import com.worksap.nlp.uzushio.lib.cleaning.Document +import javax.swing.tree.FixedHeightLayoutCache +import org.scalatest.freespec.AnyFreeSpec + + +class FixedProbRandomGenerator( + val returnProb: Double = 0.5 +) extends RandomGeneratorFromStringBase { + def generateRandom(docId: String): Double = returnProb +} + + +class DeduplicateDocumentsSpec extends AnyFreeSpec { + def generateFilter(returnProb: Double): DeduplicateDocuments = { + val randomGenerator = new FixedProbRandomGenerator(returnProb) + new DeduplicateDocuments(100, randomGenerator) + } + + "DeduplicateDocumentsSpec" - { + val filter = generateFilter(0.5) + + "computes correct ratio for non-deuplicated documents" in { + val paragraphs = testParagraphs( + Seq("test", "test", "test", "test"), + Seq(1, 1, 1, 1) + ) + val doc = Document(paragraphs, "test") + assert(0.0f == filter.computeNearDuplicateTextRatio(doc)) + assert(false == filter.shouldRemoveDocument(doc)) + } + + "computes correct ratio for non-deuplicated documents (boundary)" in { + val paragraphs = testParagraphs( + Seq("test", "test", "test", "test"), + Seq(1, 1, 99, 100) + ) + val doc = Document(paragraphs, "test") + assert(0.5f > filter.computeNearDuplicateTextRatio(doc)) + assert(false == filter.shouldRemoveDocument(doc)) + } + + "computes correct ratio for deuplicated documents" in { + val paragraphs = testParagraphs( + Seq("test", "test", "test", "test"), + Seq(100, 100, 100, 100) + ) + val doc = Document(paragraphs, "test") + assert(1.0f == filter.computeNearDuplicateTextRatio(doc)) + assert(true== filter.shouldRemoveDocument(doc)) + } + + "computes correct ratio for deuplicated documents (boundary)" in { + val paragraphs = testParagraphs( + Seq("test", "test", "test", "test"), + Seq(1, 1, 100, 100) + ) + val doc = Document(paragraphs, "test") + assert(0.5f == filter.computeNearDuplicateTextRatio(doc)) + assert(true== filter.shouldRemoveDocument(doc)) + } + } +} \ No newline at end of file diff --git a/lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/package.scala b/lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/package.scala index 2faf854..d29d460 100644 --- a/lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/package.scala +++ b/lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/package.scala @@ -31,4 +31,11 @@ package object filters { }.toIndexedSeq ) } + + def testParagraphs(texts: Seq[String], nearFreqs: Seq[Int]): IndexedSeq[Paragraph] = { + (texts, nearFreqs) + .zipped + .map ((text, freq) => Paragraph("", text, 0, 1, freq)) + .toIndexedSeq + } } From 9316695724a992c78515821f256d7cec2ef7f6e6 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Tue, 26 Sep 2023 19:36:38 +0900 Subject: [PATCH 17/20] add duplication-score and apply scalafmt --- .../uzushio/lib/filters/DeduplicateDocuments.scala | 12 ++++++------ .../lib/runners/DedupFilterStatistics.scala | 14 ++++++++++++-- .../lib/filters/DeduplicateDocumentsSpec.scala | 4 ++-- 3 files changed, 20 insertions(+), 10 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala index 0b9f7de..65a0a76 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -14,17 +14,18 @@ trait RandomGeneratorFromStringBase { // An object in arguments of DocFilter on Spark needs to mixin Serializable. object RandomGeneratorFromString extends RandomGeneratorFromStringBase with Serializable { def generateRandom(docId: String): Double = { - val seed = NgramHashExtractor.hashString(docId) + val seed = NgramHashExtractor.hashString(docId) MathUtil.asRandomDouble(seed) } } class GaussianRandomGeneratorFromString( - val mu: Double = 0.1, - val sd: Double = 0.1 -) extends RandomGeneratorFromStringBase with Serializable { + val mu: Double = 0.1, + val sd: Double = 0.1 +) extends RandomGeneratorFromStringBase + with Serializable { def generateRandom(docId: String): Double = { - val seed = NgramHashExtractor.hashString(docId) + val seed = NgramHashExtractor.hashString(docId) Random.setSeed(seed) Random.nextGaussian() * mu + sd } @@ -59,7 +60,6 @@ class DeduplicateDocuments( val nearDuplicateTextRatio = computeNearDuplicateTextRatio(doc) val thresholdProb = randomGenerator.generateRandom(doc.render()) - println(("ratio", nearDuplicateTextRatio, thresholdProb, doc.docId, doc.paragraphs.map(x => x.nearFreq))) nearDuplicateTextRatio >= thresholdProb } diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala index fe8999b..0c807ba 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala @@ -2,6 +2,7 @@ package com.worksap.nlp.uzushio.lib.runners import com.worksap.nlp.uzushio.lib.cleaning.Document import com.worksap.nlp.uzushio.lib.utils.Resources.AutoClosableResource +import com.worksap.nlp.uzushio.lib.filters.DeduplicateDocuments import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.functions.{collect_list, octet_length, udf} import org.apache.spark.sql.{SaveMode, SparkSession} @@ -42,7 +43,7 @@ object DedupFilterStatistics { val withValues = assembledDocs.select( metric($"docId", $"text", $"pos", $"exactFreq", $"nearFreq") as "res" - ).select(// make columns in the same order as FilterStatistics + ).select( // make columns in the same order as FilterStatistics $"res._2" as "value", $"res._1" as "text" ) @@ -52,7 +53,14 @@ object DedupFilterStatistics { .csv(args.output()) } - def ratioUdfConstructor[T: TypeTag](sample: Double)(extractor: Document => Float): UserDefinedFunction = { + def computeDuplicationScore(doc: Document, baseNumFreq: Int = 10) = { + val filter = new DeduplicateDocuments(baseNumFreq) + filter.computeNearDuplicateTextRatio(doc) + } + + def ratioUdfConstructor[T: TypeTag]( + sample: Double + )(extractor: Document => Float): UserDefinedFunction = { udf { ( docId: String, @@ -86,6 +94,8 @@ object DedupFilterStatistics { udfMaker(doc => doc.aliveParagraphs.map(_.nearFreq).foldRight(0)(_.max(_))) case "min-near-freq" => udfMaker(doc => doc.aliveParagraphs.map(_.nearFreq).foldRight(0)(_.min(_))) + case "duplication-score" => + udfMaker(doc => computeDuplicationScore(doc)) case _ => throw new IllegalArgumentException(s"unknown metric $ftype") } diff --git a/lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocumentsSpec.scala b/lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocumentsSpec.scala index e993957..370529f 100644 --- a/lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocumentsSpec.scala +++ b/lib/src/test/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocumentsSpec.scala @@ -48,7 +48,7 @@ class DeduplicateDocumentsSpec extends AnyFreeSpec { ) val doc = Document(paragraphs, "test") assert(1.0f == filter.computeNearDuplicateTextRatio(doc)) - assert(true== filter.shouldRemoveDocument(doc)) + assert(true == filter.shouldRemoveDocument(doc)) } "computes correct ratio for deuplicated documents (boundary)" in { @@ -58,7 +58,7 @@ class DeduplicateDocumentsSpec extends AnyFreeSpec { ) val doc = Document(paragraphs, "test") assert(0.5f == filter.computeNearDuplicateTextRatio(doc)) - assert(true== filter.shouldRemoveDocument(doc)) + assert(true == filter.shouldRemoveDocument(doc)) } } } \ No newline at end of file From f52a1f6faa941bb1b48e78b480977c67cb57e17e Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Tue, 26 Sep 2023 19:38:39 +0900 Subject: [PATCH 18/20] put back codes --- .../nlp/uzushio/lib/runners/DedupFilterStatistics.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala index 0c807ba..21c864b 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/runners/DedupFilterStatistics.scala @@ -58,9 +58,7 @@ object DedupFilterStatistics { filter.computeNearDuplicateTextRatio(doc) } - def ratioUdfConstructor[T: TypeTag]( - sample: Double - )(extractor: Document => Float): UserDefinedFunction = { + def ratioUdfConstructor[T: TypeTag](sample: Double)(extractor: Document => Float): UserDefinedFunction = { udf { ( docId: String, From 23b2e7c451fc399a7702816144bb23fddfe6bfa8 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Wed, 27 Sep 2023 09:42:10 +0900 Subject: [PATCH 19/20] modify DeduplicationFilter parameters --- .../nlp/uzushio/lib/filters/DeduplicateDocuments.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala index 65a0a76..0416c2f 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -20,7 +20,7 @@ object RandomGeneratorFromString extends RandomGeneratorFromStringBase with Seri } class GaussianRandomGeneratorFromString( - val mu: Double = 0.1, + val mu: Double = 0.3, val sd: Double = 0.1 ) extends RandomGeneratorFromStringBase with Serializable { @@ -60,6 +60,9 @@ class DeduplicateDocuments( val nearDuplicateTextRatio = computeNearDuplicateTextRatio(doc) val thresholdProb = randomGenerator.generateRandom(doc.render()) + println( + ("ratio", nearDuplicateTextRatio, thresholdProb, doc.docId, doc.paragraphs.map(x => x.nearFreq)) + ) nearDuplicateTextRatio >= thresholdProb } From 2652ac4af87c347c96e3c4b613306dd998225792 Mon Sep 17 00:00:00 2001 From: Niitsuma Takuro Date: Wed, 27 Sep 2023 09:44:43 +0900 Subject: [PATCH 20/20] remove println --- .../worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala index 0416c2f..aff7f12 100644 --- a/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala +++ b/lib/src/main/scala/com/worksap/nlp/uzushio/lib/filters/DeduplicateDocuments.scala @@ -60,9 +60,6 @@ class DeduplicateDocuments( val nearDuplicateTextRatio = computeNearDuplicateTextRatio(doc) val thresholdProb = randomGenerator.generateRandom(doc.render()) - println( - ("ratio", nearDuplicateTextRatio, thresholdProb, doc.docId, doc.paragraphs.map(x => x.nearFreq)) - ) nearDuplicateTextRatio >= thresholdProb }