Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xml events to byte stream #13

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/scala/xs4s/package.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import javax.xml.stream.{XMLEventReader, XMLInputFactory}
import javax.xml.stream.{XMLEventReader, XMLInputFactory, XMLOutputFactory}
import javax.xml.stream.events.XMLEvent
import xs4s.additions.{XMLEventReaderMaker, XMLLoader}

Expand Down Expand Up @@ -34,6 +34,9 @@ package object xs4s {
private[xs4s] def defaultXmlInputFactory: XMLInputFactory =
XMLInputFactory.newInstance()

private[xs4s] def defaultXmlOutputFactory: XMLOutputFactory =
XMLOutputFactory.newInstance()

/**
* Utilities to replicate [[scala.xml.XML]]
*/
Expand Down
21 changes: 20 additions & 1 deletion fs2/src/main/scala/xs4s/fs2compat/Fs2Syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package xs4s.fs2compat

import cats.effect.{Blocker, ContextShift, Resource, Sync}
import _root_.fs2.{Pipe, Stream}
import javax.xml.stream.XMLEventReader
import javax.xml.stream.{XMLEventReader, XMLEventWriter}
import javax.xml.stream.events.XMLEvent
import xs4s.XmlElementExtractor
import xs4s.generic.Scanner
Expand Down Expand Up @@ -38,6 +38,25 @@ trait Fs2Syntax {
.apply[XMLEvent](blocker, reader.toIterator, chunkSize))
}

implicit class RichFs2XmlEventStream[F[_] : Sync](stream: Stream[F, XMLEvent]) {

/** Writes an XMLEvent Stream to an XMLEventWriter */
def writeXmlEventStream(
xmlEventWriter: Resource[F, XMLEventWriter]): F[Unit] =
Stream
.resource(xmlEventWriter)
.flatMap(
stream
.chunks
.fold(_) { (writer, events) =>
events.foreach(writer.add)
writer.flush()
writer
})
.compile
.drain
}

implicit class RichXmlElementExtractor[O](
xmlElementExtractor: XmlElementExtractor[O]) {
def toFs2PipeThrowError[F[_]]: Pipe[F, XMLEvent, O] =
Expand Down
19 changes: 18 additions & 1 deletion fs2/src/main/scala/xs4s/fs2compat/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package xs4s

import fs2._
import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Sync}
import javax.xml.stream.XMLInputFactory
import javax.xml.stream.{XMLInputFactory, XMLOutputFactory}
import javax.xml.stream.events.XMLEvent
import syntax.fs2._

Expand Down Expand Up @@ -32,4 +32,21 @@ package object fs2compat {
F.delay(xmlInputFactory.createXMLEventReader(inputStream)))(
xmlEventReader => F.delay(xmlEventReader.close())),
chunkSize))

/**
* Turns an FS2 XMLEvent Stream into a stream of Bytes.
**/
def xmlEventStreamToByteStream[F[_]: ConcurrentEffect: ContextShift](
blocker: Blocker,
xmlOutputFactory: XMLOutputFactory = defaultXmlOutputFactory,
chunkSize: Int)(
implicit F: Sync[F]): Pipe[F, XMLEvent, Byte] =
(xmlEventStream: Stream[F, XMLEvent]) =>
io.readOutputStream(blocker, chunkSize)(
outputStream =>
xmlEventStream.writeXmlEventStream(
Resource.make(
F.delay(xmlOutputFactory.createXMLEventWriter(outputStream)))(
XMLEventWriter => F.delay(XMLEventWriter.close()))
))
}
21 changes: 20 additions & 1 deletion fs2v3/src/main/scala/xs4s/fs2compat/Fs2Syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import xs4s.XmlElementExtractor
import xs4s.generic.Scanner
import xs4s.syntax.core._

import javax.xml.stream.XMLEventReader
import javax.xml.stream.{XMLEventReader, XMLEventWriter}
import javax.xml.stream.events.XMLEvent
import scala.language.higherKinds

Expand Down Expand Up @@ -37,6 +37,25 @@ trait Fs2Syntax {
.apply[XMLEvent](reader.toIterator, chunkSize))
}

implicit class RichFs2XmlEventStream[F[_] : Sync](stream: Stream[F, XMLEvent]) {

/** Writes an XMLEvent Stream to an XMLEventWriter */
def writeXmlEventStream(
xmlEventWriter: Resource[F, XMLEventWriter]): F[Unit] =
Stream
.resource(xmlEventWriter)
.flatMap(
stream
.chunks
.fold(_) { (writer, events) =>
events.foreach(writer.add)
writer.flush()
writer
})
.compile
.drain
}

implicit class RichXmlElementExtractor[O](
xmlElementExtractor: XmlElementExtractor[O]) {
def toFs2PipeThrowError[F[_]]: Pipe[F, XMLEvent, O] =
Expand Down
17 changes: 16 additions & 1 deletion fs2v3/src/main/scala/xs4s/fs2compat/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.effect.{Async, Resource, Sync}
import fs2._
import xs4s.syntax.fs2._

import javax.xml.stream.XMLInputFactory
import javax.xml.stream.{XMLInputFactory, XMLOutputFactory}
import javax.xml.stream.events.XMLEvent
import scala.language.higherKinds

Expand All @@ -29,4 +29,19 @@ package object fs2compat {
F.delay(xmlInputFactory.createXMLEventReader(inputStream)))(
xmlEventReader => F.delay(xmlEventReader.close())),
chunkSize))

/**
* Turns an FS2 XMLEvent Stream into a stream of Bytes.
**/
def xmlEventStreamToByteStream[F[_] : Async](
xmlOutputFactory: XMLOutputFactory = defaultXmlOutputFactory,
chunkSize: Int)(implicit F: Sync[F]): Pipe[F, XMLEvent, Byte] =
(xmlEventStream: Stream[F, XMLEvent]) =>
io.readOutputStream(chunkSize)(
outputStream =>
xmlEventStream.writeXmlEventStream(
Resource.make(
F.delay(xmlOutputFactory.createXMLEventWriter(outputStream)))(
XMLEventWriter => F.delay(XMLEventWriter.close()))
))
}
40 changes: 28 additions & 12 deletions fs2v3/src/test/scala/xs4s/fs2compat/Fs2CompatSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,33 @@ package xs4s.fs2compat

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import fs2.Stream
import fs2.{Chunk, Stream}
import org.scalatest.freespec.AnyFreeSpec
import xs4s.XmlElementExtractor
import xs4s.syntax.fs2.RichXmlElementExtractor

import scala.xml.Elem

final class Fs2CompatSpec extends AnyFreeSpec {
"It works" in {
val input =
s"""
|<items>
|<embedded><item>Embedded</item></embedded>
|<item>General</item>
|<embedded-twice><embedded-once><item>Doubly embedded</item></embedded-once></embedded-twice>
|<item><item>Nested</item></item>
|</items>
|
private val input =
s"""
|<items>
|<embedded><item>Embedded</item></embedded>
|<item>General</item>
|<embedded-twice><embedded-once><item>Doubly embedded</item></embedded-once></embedded-twice>
|<item><item>Nested</item></item>
|</items>
|
""".stripMargin

"It works" in {
val anchorElementExtractor: XmlElementExtractor[Elem] =
XmlElementExtractor.filterElementsByName("item")

val textStream: Stream[IO, String] = fs2.Stream
.apply[IO, String](input)
.flatMap(str => fs2.Stream.emits(str.getBytes().toList))
.through(byteStreamToXmlEventStream[IO]())
.through(byteStreamToXmlEventStream[IO](chunkSize = 10240))
.through(anchorElementExtractor.toFs2PipeThrowError)
.map(_.text)

Expand All @@ -38,4 +38,20 @@ final class Fs2CompatSpec extends AnyFreeSpec {
"Doubly embedded",
"Nested"))
}

"Roundtrip" in {
val stringStream: Stream[IO, String] = fs2.Stream
.apply[IO, String](input)
.flatMap(str => fs2.Stream.emits(str.getBytes().toList))
.through(byteStreamToXmlEventStream[IO](chunkSize = 10240))
.through(xmlEventStreamToByteStream[IO](chunkSize = 10240))
.through(fs2.text.utf8Decode)

val string = stringStream.compile.string.unsafeRunSync()
.replaceAll("<\\?xml.*?>", "")
.replaceAll("\r?\n", "\n")

assert(
string == input.trim.replaceAll("\r?\n", "\n"))
}
}