Skip to content

User guide – reactive streaming

This guide explains the reactive streaming functionalities of the jelly-stream module.

Prerequisites

If you are unfamiliar with the concept of reactive streams or Apache Pekko Streams, we highly recommend you start from reading about the basic concepts of Pekko Streams.

We also recommend you first read about the RDF stream types in Jelly. Otherwise, this guide may not make much sense.

You can use jelly-stream with any RDF library that has a Jelly integration, such as Apache Jena (using jelly-jena) or RDF4J (using jelly-rdf4j). The streaming API is generic and identical across all libraries.

Basic concepts

A key notion of this API are the encoders and decoders.

  • An encoder turns objects from your RDF library of choice (e.g., Triple in Apache Jena) into an object representation of Jelly's binary format (RdfStreamFrame).
  • A decoder does the opposite: it turns RdfStreamFrames into objects from your RDF library of choice.

So, for example, an encoder flow for flat triple streams would have a type of Flow[Triple, RdfStreamFrame, NotUsed] in Apache Jena. The opposite (a flat triple stream decoder) would have a type of Flow[RdfStreamFrame, Triple, NotUsed].

RdfStreamFrames can be converted to and from raw bytes using a range of methods, depending on your use case. See the sections below for examples.

Encoding a single RDF graph or dataset as a flat stream (EncoderSource)

The easiest way to start is with flat RDF streams (i.e., flat streams of triples or quads). You can convert an RDF dataset or graph into such using the methods in eu.ostrzyciel.jelly.stream.EncoderSource .

Example: PekkoStreamsEncoderSource.scala (click to expand)

Source code on GitHub

PekkoStreamsEncoderSource.scala
package eu.ostrzyciel.jelly.examples

import eu.ostrzyciel.jelly.core.JellyOptions
import eu.ostrzyciel.jelly.convert.jena.given
import eu.ostrzyciel.jelly.stream.*
import org.apache.jena.riot.RDFDataMgr
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.*

import java.io.File
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration.*

/**
 * Example of using the [[eu.ostrzyciel.jelly.stream.EncoderSource]] utility to convert RDF graphs and datasets
 * into Jelly streams with a single method call.
 *
 * In this example we are using Apache Jena as the RDF library (note the import:
 * `import eu.ostrzyciel.jelly.convert.jena.given`).
 * The same can be achieved with RDF4J just by importing a different module.
 */
object PekkoStreamsEncoderSource extends shared.Example:
  def main(args: Array[String]): Unit =
    // We will need a Pekko actor system to run the streams
    given actorSystem: ActorSystem = ActorSystem()
    // And an execution context for the futures
    given ExecutionContext = actorSystem.getDispatcher

    // Load an example RDF graph from an N-Triples file
    val model = RDFDataMgr.loadModel(File(getClass.getResource("/weather.nt").toURI).toURI.toString)

    println(s"Loaded model with ${model.size()} triples")
    println(s"Streaming the model to memory...")

    // Create a Pekko Streams Source from the Jena model
    // This automatically sets the physical and logical stream types.
    val encodedModelFuture = EncoderSource
      .fromGraph(
        model,
        // Aim for frames with ~2000 bytes – may be more!
        ByteSizeLimiter(2000),
        JellyOptions.smallStrict,
      )
      // wireTap: print the size of the frames
      // Notice in the output that the frames are slightly bigger than 2000 bytes.
      .wireTap(frame => println(s"Frame with ${frame.rows.size} rows, ${frame.serializedSize} bytes on wire"))
      // Convert each stream frame to bytes
      .via(JellyIo.toBytes)
      // Collect the stream into a sequence
      .runWith(Sink.seq)

    // Wait for the stream to complete and collect the result
    val encodedModel = Await.result(encodedModelFuture, 10.seconds)

    println(s"Streamed model to memory with ${encodedModel.size} frames and" +
      s" ${encodedModel.map(_.length).sum} bytes on wire")

    println("\n")

    // -------------------------------------------------------------------
    // Second example: try encoding an RDF dataset as a GRAPHS stream
    val dataset = RDFDataMgr.loadDataset(File(getClass.getResource("/weather-graphs.trig").toURI).toURI.toString)
    println(s"Loaded dataset with ${dataset.asDatasetGraph.size} named graphs")
    println(s"Streaming the dataset to memory...")

    val encodedDatasetFuture = EncoderSource
      // Here we stream this is as a GRAPHS stream (physical type)
      // You can also use .fromDatasetAsQuads to stream as QUADS
      .fromDatasetAsGraphs(
        dataset,
        // This time we limit the number of rows in each frame to 30
        // Note that for this particular encoder, we can skip the limiter entirely – but this can lead to huge frames!
        // So, be careful with that, or may get an out-of-memory error.
        Some(StreamRowCountLimiter(30)),
        JellyOptions.smallStrict,
      )
      // wireTap: print the size of the frames
      // Note that some frames smaller than the limit – this is because this encoder will always split frames
      // on graph boundaries.
      .wireTap(frame => println(s"Frame with ${frame.rows.size} rows, ${frame.serializedSize} bytes on wire"))
      // Convert each stream frame to bytes
      .via(JellyIo.toBytes)
      // Collect the stream into a sequence
      .runWith(Sink.seq)

    // Wait for the stream to complete and collect the result
    val encodedDataset = Await.result(encodedDatasetFuture, 10.seconds)

    println(s"Streamed dataset to memory with ${encodedDataset.size} frames and" +
      s" ${encodedDataset.map(_.length).sum} bytes on wire")

    actorSystem.terminate()

Encoding any RDF data as a flat or grouped stream (EncoderFlow)

The eu.ostrzyciel.jelly.stream.EncoderFlow provides even more options for turning RDF data into Jelly streams, including both grouped and flat streams. Every type of RDF stream in Jelly can be created using this API.

Example: PekkoStreamsEncoderFlow.scala (click to expand)

Source code on GitHub

PekkoStreamsEncoderFlow.scala
package eu.ostrzyciel.jelly.examples

import eu.ostrzyciel.jelly.convert.jena.given
import eu.ostrzyciel.jelly.core.JellyOptions
import eu.ostrzyciel.jelly.stream.*
import org.apache.jena.graph.{Node, Triple}
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.sparql.core.Quad
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.*

import java.io.File
import scala.collection.immutable
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration.*

/**
 * Example of using the [[eu.ostrzyciel.jelly.stream.EncoderFlow]] utility to encode RDF data as Jelly streams.
 * 
 * Here, the RDF data is turned into a series of byte buffers, with each buffer corresponding to exactly one frame.
 * This is suitable if your streaming protocol (e.g., Kafka, MQTT, AMQP) already frames the messages.
 * If you are writing to a raw socket or file, then you must use the DELIMITED variant of Jelly instead.
 * See [[eu.ostrzyciel.jelly.examples.PekkoStreamsWithIo]] for examples of that.
 *
 * In this example we are using Apache Jena as the RDF library (note the import:
 * `import eu.ostrzyciel.jelly.convert.jena.given`).
 * The same can be achieved with RDF4J just by importing a different module.
 */
object PekkoStreamsEncoderFlow extends shared.Example:
  def main(args: Array[String]): Unit =
    // We will need a Pekko actor system to run the streams
    given actorSystem: ActorSystem = ActorSystem()
    // And an execution context for the futures
    given ExecutionContext = actorSystem.getDispatcher

    // Load the example dataset
    val dataset = RDFDataMgr.loadDataset(File(getClass.getResource("/weather-graphs.trig").toURI).toURI.toString)

    // First, let's see what views of the dataset can we obtain using Jelly's Iterable adapters:
    // 1. Iterable of all quads in the dataset
    val quads: immutable.Iterable[Quad] = dataset.asQuads
    // 2. Iterable of all graphs (named and default) in the dataset
    val graphs: immutable.Iterable[(Node, Iterable[Triple])] = dataset.asGraphs
    // 3. Iterable of all triples in the default graph
    val triples: immutable.Iterable[Triple] = dataset.getDefaultModel.asTriples

    // Note: here we are not turning the frames into bytes, but just printing their size in bytes.
    // You can find an example of how to turn a frame into a byte array in the `PekkoStreamsEncoderSource` example.
    // This is done with: .via(JellyIo.toBytes)

    // Let's try encoding this as flat RDF streams (streams of triples or quads)
    // https://w3id.org/stax/ontology#flatQuadStream
    println(f"Encoding ${quads.size} quads as a flat RDF quad stream")
    val flatQuadsFuture = Source(quads)
      .via(EncoderFlow.flatQuadStream(
        // This encoder requires a size limiter – otherwise a stream frame could have infinite length!
        StreamRowCountLimiter(20),
        JellyOptions.smallStrict,
      ))
      .runWith(Sink.foreach(frame => println(s"Frame with ${frame.rows.size} rows, ${frame.serializedSize} bytes")))

    Await.ready(flatQuadsFuture, 10.seconds)

    // https://w3id.org/stax/ontology#flatTripleStream
    println(f"\n\nEncoding ${triples.size} triples as a flat RDF triple stream")
    val flatTriplesFuture = Source(triples)
      .via(EncoderFlow.flatTripleStream(
        // This encoder requires a size limiter – otherwise a stream frame could have infinite length!
        ByteSizeLimiter(500),
        JellyOptions.smallStrict,
      ))
      .runWith(Sink.foreach(frame => println(s"Frame with ${frame.rows.size} rows, ${frame.serializedSize} bytes")))

    Await.ready(flatTriplesFuture, 10.seconds)

    // We can also stream already grouped triples or quads – for example, if your system generates batches of
    // N triples, you can just send those batches straight to be encoded, with one batch = one stream frame.
    // https://w3id.org/stax/ontology#flatQuadStream
    println(f"\n\nEncoding ${quads.size} quads as a flat RDF quad stream, grouped in batches of 10")
    // First, group the quads into batches of 8
    val groupedQuadsFuture = Source.fromIterator(() => quads.grouped(10))
      .via(EncoderFlow.flatQuadStreamGrouped(
        // Do not use a size limiter here – we want exactly one batch in each frame
        None,
        JellyOptions.smallStrict,
      ))
      .runWith(Sink.foreach(frame => println(s"Frame with ${frame.rows.size} rows, ${frame.serializedSize} bytes")))

    Await.ready(groupedQuadsFuture, 10.seconds)

    // Now, let's try grouped streams. Let's say we want to stream all graphs in a dataset, but put exactly one
    // graph in each frame (message). This is very common in (for example) IoT systems.
    // https://w3id.org/stax/ontology#namedGraphStream
    println(f"\n\nEncoding ${graphs.size} graphs as a named graph stream")
    val namedGraphsFuture = Source(graphs)
      .via(EncoderFlow.namedGraphStream(
        // Do not use a size limiter here – we want exactly one graph in each frame
        None,
        JellyOptions.smallStrict,
      ))
      // Note that we will see exactly as many frames as there are graphs in the dataset
      .runWith(Sink.foreach(frame => println(s"Frame with ${frame.rows.size} rows, ${frame.serializedSize} bytes")))

    Await.ready(namedGraphsFuture, 10.seconds)

    // As a last example, we will stream a series of RDF graphs. In our case this will be just the default graph
    // repeated a few times. This type of stream is also pretty common in practical applications.
    // https://w3id.org/stax/ontology#graphStream
    println(f"\n\nEncoding 5 RDF graphs as a graph stream")
    val graphsFuture = Source.repeat(triples)
      .take(5)
      .via(EncoderFlow.graphStream(
        // Do not use a size limiter here – we want exactly one graph in each frame
        None,
        JellyOptions.smallStrict,
      ))
      // Note that we will see exactly 5 frames – the number of graphs we streamed
      .runWith(Sink.foreach(frame => println(s"Frame with ${frame.rows.size} rows, ${frame.serializedSize} bytes")))

    Await.ready(graphsFuture, 10.seconds)

    actorSystem.terminate()

Decoding RDF streams (DecoderFlow)

The eu.ostrzyciel.jelly.stream.DecoderFlow provides methods for decoding flat and grouped streams. There is no opposite equivalent to EncoderSource for decoding, though. This would require constructing an RDF graph or dataset from statements, which is a process that can vary a lot depending on your application. You will have to do this part yourself.

Example: PekkoStreamsDecoderFlow.scala (click to expand)

Source code on GitHub

PekkoStreamsDecoderFlow.scala
package eu.ostrzyciel.jelly.examples

import eu.ostrzyciel.jelly.convert.jena.given
import eu.ostrzyciel.jelly.core.JellyOptions
import eu.ostrzyciel.jelly.stream.*
import org.apache.jena.graph.{Node, Triple}
import org.apache.jena.query.Dataset
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.sparql.core.Quad
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.*

import java.io.File
import scala.collection.immutable
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration.*

/**
 * Example of using the [[eu.ostrzyciel.jelly.stream.DecoderFlow]] utility to turn incoming Jelly streams
 * into usable RDF data.
 *
 * In this example we are using Apache Jena as the RDF library (note the import:
 * `import eu.ostrzyciel.jelly.convert.jena.given`).
 * The same can be achieved with RDF4J just by importing a different module.
 */
object PekkoStreamsDecoderFlow extends shared.Example:
  def main(args: Array[String]): Unit =
    // We will need a Pekko actor system to run the streams
    given actorSystem: ActorSystem = ActorSystem()
    // And an execution context for the futures
    given ExecutionContext = actorSystem.getDispatcher

    // Load the example dataset
    val dataset = RDFDataMgr.loadDataset(File(getClass.getResource("/weather-graphs.trig").toURI).toURI.toString)

    // To decode something, we first need to encode it...
    // See [[PekkoStreamsEncoderFlow]] and [[PekkoStreamsEncoderSource]] for an explanation of what is happening here.
    // We have four seqences of byte arrays, with each byte array corresponding to one encoded stream frame:
    // - encodedQuads: a flat RDF quad stream, physical type: QUADS
    // - encodedTriples: a flat RDF triple stream, physical type: TRIPLES
    // - encodedGraphs: a flat RDF quad stream, physical type: GRAPHS
    val (encodedQuads, encodedTriples, encodedGraphs) = getEncodedData(dataset)

    // Now we can decode the encoded data back into something useful.
    // Let's start by simply decoding the quads as a flat RDF quad stream:
    println("Decoding quads as a flat RDF quad stream...")
    val decodedQuadsFuture = Source(encodedQuads)
      // We need to parse the bytes into a Jelly stream frame
      .via(JellyIo.fromBytes)
      // And then decode the frame into Jena quads.
      // We use "decodeQuads" because the physical stream type is QUADS.
      // And then we want to treat it as a flat RDF quad stream, so we call "asFlatQuadStreamStrict".
      // We use the "Strict" method to tell the decoder to check if the incoming logical stream type is the same
      // as we are expecting: flat RDF quad stream.
      .via(DecoderFlow.decodeQuads.asFlatQuadStreamStrict)
      .runWith(Sink.seq)

    val decodedQuads: Seq[Quad] = Await.result(decodedQuadsFuture, 10.seconds)
    println(s"Decoded ${decodedQuads.size} quads.")

    // We can also treat each stream frame as a separate dataset. This way we would get an
    // RDF dataset stream.
    println(f"\n\nDecoding quads as an RDF dataset stream from ${encodedQuads.size} frames...")
    val decodedDatasetFuture = Source(encodedQuads)
      .via(JellyIo.fromBytes)
      // Note that we cannot use the strict variant (asDatasetStreamOfQuadsStrict) here, because the stream says its
      // logical type is flat RDF quad stream.
      .via(DecoderFlow.decodeQuads.asDatasetStreamOfQuads)
      .runWith(Sink.seq)

    val decodedDatasets: Seq[IterableOnce[Quad]] = Await.result(decodedDatasetFuture, 10.seconds)
    println(s"Decoded ${decodedDatasets.size} datasets with" +
      s" ${decodedDatasets.map(_.iterator.size).sum} quads in total.")

    // If we tried that with the strict variant, we would get an exception:
    println(f"\n\nDecoding quads as an RDF dataset stream with strict logical type handling...")
    val future = Source(encodedQuads)
      .via(JellyIo.fromBytes)
      .via(DecoderFlow.decodeQuads.asDatasetStreamOfQuadsStrict)
      .runWith(Sink.seq)
    Await.result(future.recover {
      // eu.ostrzyciel.jelly.core.JellyExceptions$RdfProtoDeserializationError:
      // Expected logical stream type LOGICAL_STREAM_TYPE_DATASETS, got LOGICAL_STREAM_TYPE_FLAT_QUADS.
      // LOGICAL_STREAM_TYPE_FLAT_QUADS is not a subtype of LOGICAL_STREAM_TYPE_DATASETS.
      case e: Exception => println(e.getCause)
    }, 10.seconds)

    // We can also pass entirely custom supported options to the decoder, instead of the defaults
    // (see [[JellyOptions.defaultSupportedOptions]]). This is useful if we want to decode a stream with
    // for example very large lookup tables or we want to put stricter limits on the streams that we accept.
    println(f"\n\nDecoding quads as an RDF dataset stream with custom supported options...")
    val customSupportedOptions = JellyOptions.defaultSupportedOptions
      .withMaxNameTableSize(50) // This is too small for the stream we are decoding
    val customSupportedOptionsFuture = Source(encodedQuads)
      .via(JellyIo.fromBytes)
      .via(DecoderFlow.decodeQuads.asDatasetStreamOfQuads(customSupportedOptions))
      .runWith(Sink.seq)
    Await.result(customSupportedOptionsFuture.recover {
      // eu.ostrzyciel.jelly.core.JellyExceptions$RdfProtoDeserializationError:
      // The stream uses a name table size of 128, which is larger than the maximum supported size of 50.
      // To read this stream, set maxNameTableSize to at least 128 in the supportedOptions for this decoder.
      case e: Exception => println(e.getCause)
    }, 10.seconds)

    // Flat RDF triple stream
    println(f"\n\nDecoding triples as a flat RDF triple stream...")
    val decodedTriplesFuture = Source(encodedTriples)
      .via(JellyIo.fromBytes)
      .via(DecoderFlow.decodeTriples.asFlatTripleStreamStrict)
      .runWith(Sink.seq)

    val decodedTriples: Seq[Triple] = Await.result(decodedTriplesFuture, 10.seconds)
    println(s"Decoded ${decodedTriples.size} triples.")

    // We can interpret the GRAPHS stream in a few ways, see
    // [[eu.ostrzyciel.jelly.stream.DecoderFlow.GraphsIngestFlowOps]] for more details.
    // Here we will treat it as an RDF named graph stream.
    println(f"\n\nDecoding graphs as an RDF named graph stream...")
    val decodedGraphsFuture = Source(encodedGraphs)
      .via(JellyIo.fromBytes)
      // Non-strict because the original logical stream type is flat RDF quad stream.
      .via(DecoderFlow.decodeGraphs.asNamedGraphStream)
      .runWith(Sink.seq)

    val decodedGraphs: Seq[(Node, Iterable[Triple])] = Await.result(decodedGraphsFuture, 10.seconds)
    println(s"Decoded ${decodedGraphs.size} graphs.")

    // If we tried using a decoder for a physical stream type that does not match the type of the stream,
    // we would get an exception. Here let's try to decode a QUADS stream with a TRIPLES decoder.
    println(f"\n\nDecoding quads as a flat RDF triple stream...")
    val future2 = Source(encodedQuads)
      .via(JellyIo.fromBytes)
      // Note the "decodeTriples" here
      .via(DecoderFlow.decodeTriples.asFlatTripleStream)
      .runWith(Sink.seq)
    Await.result(future2.recover {
      // eu.ostrzyciel.jelly.core.JellyExceptions$RdfProtoDeserializationError:
      // Incoming stream type is not TRIPLES.
      case e: Exception => println(e.getCause)
    }, 10.seconds)

    // We can get around this by using the "decodeAny" method, which will pick the appropriate decoder
    // based on the stream options in the stream.
    // In this case we can only ask the decoder to output a flat or grouped RDF stream.
    println(f"\n\nDecoding quads as a flat RDF stream using decodeAny...")
    val decodedAnyFuture = Source(encodedQuads)
      .via(JellyIo.fromBytes)
      // The is no strict variant at all for decodeAny, as we don't care about the stream type anyway.
      .via(DecoderFlow.decodeAny.asFlatStream)
      .runWith(Sink.seq)

    val decodedAny: Seq[Triple | Quad] = Await.result(decodedAnyFuture, 10.seconds)
    println(s"Decoded ${decodedAny.size} statements.")

    // One last trick up our sleeves is the snoopStreamOptions method, which allows us to inspect the stream options
    // and carry on with the decoding as normal.
    // In this case, we will reuse the first example (flat RDF quad stream) and snoop the stream options.
    println(f"\n\nSnooping the stream options of the first frame while decoding a flat RDF quad stream...")
    val snoopFuture = Source(encodedQuads)
      .via(JellyIo.fromBytes)
      // We add a .viaMat here to capture the materialized value of this stage.
      .viaMat(DecoderFlow.snoopStreamOptions)(Keep.right)
      .via(DecoderFlow.decodeQuads.asFlatQuadStreamStrict)
      .toMat(Sink.seq)(Keep.both)
      .run()

    val streamOptions = Await.result(snoopFuture._1, 10.seconds)
    val decodedQuads2 = Await.result(snoopFuture._2, 10.seconds)

    val streamOptionsIndented = ("\n" + streamOptions.get.toProtoString.strip).replace("\n", "\n  ")
    println(s"Stream options: $streamOptionsIndented")
    println(s"Decoded ${decodedQuads2.size} quads.")

    actorSystem.terminate()


  /**
   * Helper method to produce encoded data from a dataset.
   */
  private def getEncodedData(dataset: Dataset)(using ActorSystem, ExecutionContext):
  (Seq[Array[Byte]], Seq[Array[Byte]], Seq[Array[Byte]]) =
    val quadStream = EncoderSource.fromDatasetAsQuads(
      dataset,
      ByteSizeLimiter(500),
      JellyOptions.smallStrict
    )
    val tripleStream = EncoderSource.fromGraph(
      dataset.getDefaultModel,
      ByteSizeLimiter(250),
      JellyOptions.smallStrict
    )
    val graphStream = EncoderSource.fromDatasetAsGraphs(
      dataset,
      None,
      JellyOptions.smallStrict
    )
    val results = Seq(quadStream, tripleStream, graphStream).map { stream =>
      val streamFuture = stream
        .via(JellyIo.toBytes)
        .runWith(Sink.seq)
      Await.result(streamFuture, 10.seconds)
    }
    (results.head, results(1), results(2))

Byte streams (delimited variant)

In all of the examples above, we used the non-delimited variant of Jelly, which is appropriate for, e.g., sending Jelly data over gRPC or Kafka. If you want to write Jelly data to a file or a socket, you will need to use the delimited variant. jelly-stream provides a few methods for this in eu.ostrzyciel.jelly.stream.JellyIo .

Example: PekkoStreamsWithIo.scala (click to expand)

Source code on GitHub

PekkoStreamsWithIo.scala
package eu.ostrzyciel.jelly.examples

import eu.ostrzyciel.jelly.convert.jena.given
import eu.ostrzyciel.jelly.core.JellyOptions
import eu.ostrzyciel.jelly.stream.*
import org.apache.jena.graph.{Node, Triple}
import org.apache.jena.query.Dataset
import org.apache.jena.riot.RDFDataMgr
import org.apache.jena.sparql.core.Quad
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.*
import org.apache.pekko.util.ByteString

import java.io.{File, FileInputStream, FileOutputStream}
import java.util.zip.GZIPInputStream
import scala.collection.immutable
import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration.*
import scala.util.Using

/**
 * Example of using Pekko Streams to read/write Jelly to a file or any other byte stream (e.g., socket).
 *
 * The examples here use the DELIMITED variant of Jelly, which is suitable only for situations where there is
 * no framing in the underlying stream. You should always use the delimited variant with raw files and sockets,
 * as otherwise it would be impossible to tell where one stream frame ends and another one begins.
 *
 * If you are working with something like MQTT, Kafka, JMS, AMQP... then check the examples in
 * [[eu.ostrzyciel.jelly.examples.PekkoStreamsEncoderFlow]].
 *
 * In this example we are using Apache Jena as the RDF library (note the import:
 * `import eu.ostrzyciel.jelly.convert.jena.given`).
 * The same can be achieved with RDF4J just by importing a different module.
 */
object PekkoStreamsWithIo extends shared.Example:
  def main(args: Array[String]): Unit =
    // We will need a Pekko actor system to run the streams
    given actorSystem: ActorSystem = ActorSystem()
    // And an execution context for the futures
    given ExecutionContext = actorSystem.getDispatcher

    // We will read a gzipped Jelly file from disk and decode it on the fly, as we are decompressing it.
    println("Decoding a gzipped Jelly file with Pekko Streams...")
    // The input file is a GZipped Jelly file
    val inputFile = File(getClass.getResource("/jelly/weather.jelly.gz").toURI)

    // Use Java's GZIPInputStream to decompress the input file on the fly
    val decodedTriples: Seq[Triple] = Using.resource(new GZIPInputStream(FileInputStream(inputFile))) { inputStream =>
      val decodedTriplesFuture = JellyIo.fromIoStream(inputStream)
        // Decode the Jelly frames to triples.
        // Under the hood it uses the RdfStreamFrame.parseDelimitedFrom method.
        .via(DecoderFlow.decodeTriples.asFlatTripleStream)
        .runWith(Sink.seq)

      Await.result(decodedTriplesFuture, 10.seconds)
    }

    println(s"Decoded ${decodedTriples.size} triples")

    // -----------------------------------------------------------
    // Now we will write the decoded triples to a new Jelly file
    println("\n\nWriting the decoded triples to a new Jelly file with Pekko Streams...")
    Using.resource(new FileOutputStream("weather.jelly")) { outputStream =>
      val writeFuture = Source(decodedTriples)
        // Encode the triples to Jelly
        .via(EncoderFlow.flatTripleStream(
          ByteSizeLimiter(500),
          JellyOptions.smallStrict
        ))
        // Write the Jelly frames to a Java byte stream.
        // Under the hood it uses the RdfStreamFrame.writeDelimitedTo method.
        .runWith(JellyIo.toIoStream(outputStream))

      Await.ready(writeFuture, 10.seconds)
      println("Done writing the Jelly file.")
    }

    // -----------------------------------------------------------
    // Pekko Streams offers its own utilities for reading and writing bytes that do not involve using Java's
    // blocking implementation of streams.
    // We will again write the decoded triples to a Jelly file, but this time use Pekko's facilities.
    println("\n\nWriting the decoded triples to a new Jelly file with Pekko Streams' utilities...")
    val writeFuture = Source(decodedTriples)
      .via(EncoderFlow.flatTripleStream(
        ByteSizeLimiter(500),
        JellyOptions.smallStrict
      ))
      // Convert the frames into Pekko's byte strings.
      // Note: we are using the DELIMITED variant because we will write this to disk!
      .via(JellyIo.toBytesDelimited)
      .map(bytes => ByteString(bytes))
      .runWith(FileIO.toPath(File("weather2.jelly").toPath))

    Await.ready(writeFuture, 10.seconds)
    println("Done writing the Jelly file.")

    actorSystem.terminate()

See also