As with the jelly-stream module, you can use jelly-grpc with any RDF library that has a Jelly integration, such as Apache Jena (using jelly-jena) or RDF4J (using jelly-rdf4j). The gRPC API is generic and identical across all libraries.
Making a gRPC server and client
jelly-grpc builds on the Apache Pekko gRPC library. Jelly-JVM provides boilerplate code for setting up a gRPC server and client that can send and receive Jelly streams, as shown in the example below:
packageeu.ostrzyciel.jelly.examplesimportcom.typesafe.config.ConfigFactoryimporteu.ostrzyciel.jelly.convert.jena.givenimporteu.ostrzyciel.jelly.core.JellyOptionsimporteu.ostrzyciel.jelly.core.proto.v1.*importeu.ostrzyciel.jelly.grpc.RdfStreamServerimporteu.ostrzyciel.jelly.stream.*importorg.apache.jena.riot.RDFDataMgrimportorg.apache.pekko.NotUsedimportorg.apache.pekko.actor.typed.ActorSystemimportorg.apache.pekko.actor.typed.javadsl.Behaviorsimportorg.apache.pekko.grpc.{GrpcClientSettings,GrpcServiceException}importorg.apache.pekko.stream.scaladsl.*importjava.io.Fileimportscala.concurrent.{Await,ExecutionContext,Future}importscala.concurrent.duration.*importscala.util.{Failure,Success}/** * Example of using Jelly's gRPC client and server to send Jelly streams over the network. * This uses the Apache Pekko gRPC library. Its documentation can be found at: * https://pekko.apache.org/docs/pekko-grpc/current/index.html * * See also examples named `PekkoStreams*` for instructions on encoding and decoding RDF streams with Jelly. * * In this example we are using Apache Jena as the RDF library (note the import: * `import eu.ostrzyciel.jelly.convert.jena.given`). * The same can be achieved with RDF4J just by importing a different module. */objectPekkoGrpcextendsshared.Example:// Create a config for Pekko gRPC.// We can use the same config for the client and the server, as we are communicating on localhost.// This would usually be loaded from a configuration file (e.g., application.conf).// More details: https://github.com/lightbend/configvalconfig=ConfigFactory.parseString(""" |pekko.http.server.preview.enable-http2 = on |pekko.grpc.client.jelly.host = 127.0.0.1 |pekko.grpc.client.jelly.port = 8088 |pekko.grpc.client.jelly.enable-gzip = true |pekko.grpc.client.jelly.use-tls = false |pekko.grpc.client.jelly.backend = netty |""".stripMargin).withFallback(ConfigFactory.defaultApplication())// We will need two Pekko actor systems to run the streams – one for the server and one for the clientvalserverActorSystem:ActorSystem[_]=ActorSystem(Behaviors.empty,"ServerSystem")valclientActorSystem:ActorSystem[_]=ActorSystem(Behaviors.empty,"ClientSystem",config)// Our mock dataset that we will send around in the streamsvaldataset=RDFDataMgr.loadDataset(File(getClass.getResource("/weather-graphs.trig").toURI).toURI.toString)/** * Main method that starts the server and the client. */defmain(args:Array[String]):Unit=givensystem:ActorSystem[_]=serverActorSystemgivenExecutionContext=system.executionContext// Start the servervalexampleService=ExampleJellyService()RdfStreamServer(RdfStreamServer.Options.fromConfig(config.getConfig("pekko.grpc.client.jelly")),exampleService).run()onComplete{caseSuccess(binding)=>// If the server started successfully, start the clientprintln(s"[SERVER] Bound to ${binding.localAddress}")runClient()caseFailure(exception)=>// Otherwise, print the error and terminate the actor systemprintln(s"[SERVER] Failed to bind: $exception")system.terminate()}/** * The client part of the example. */privatedefrunClient():Unit=givensystem:ActorSystem[_]=clientActorSystemgivenExecutionContext=system.executionContext// Create a gRPC clientvalclient=RdfStreamServiceClient(GrpcClientSettings.fromConfig("jelly"))// First, let's try to publish some data to the servervalframeSource=RdfSource.builder.datasetAsQuads(dataset).source// Encode the dataset as a stream of QUADS.via(EncoderFlow.builder.withLimiter(ByteSizeLimiter(500)).flatQuads(JellyOptions.smallStrict.withStreamName("weather")).flow)println("[CLIENT] Publishing data to the server...")valpublishFuture=client.publishRdf(frameSource)map{response=>println(s"[CLIENT] Received acknowledgment: $response")}recover{casee=>println(s"[CLIENT] Failed to publish data: $e")}// Wait for the publish to completeAwait.ready(publishFuture,10.seconds)// Now, let's try to subscribe to some data from the server in the QUADS formatprintln("\n\n[CLIENT] Subscribing to QUADS data from the server...")valquadsFuture=client.subscribeRdf(RdfStreamSubscribe("weather",Some(JellyOptions.smallStrict.withPhysicalType(PhysicalStreamType.QUADS)))).via(DecoderFlow.decodeQuads.asFlatQuadStreamStrict).runFold(0L)((acc,_)=>acc+1)// Process the result of the stream (Future[Long]).map{counter=>println(s"[CLIENT] Received $counter quads.")}recover{casee=>println(s"[CLIENT] Failed to receive quads: $e")}Await.ready(quadsFuture,10.seconds)// Let's try the same, with a GRAPHS streamprintln("\n\n[CLIENT] Subscribing to GRAPHS data from the server...")valgraphsFuture=client.subscribeRdf(RdfStreamSubscribe("weather",Some(JellyOptions.smallStrict.withPhysicalType(PhysicalStreamType.GRAPHS))))// Decode the response and transform it into a stream of quads.via(DecoderFlow.decodeGraphs.asDatasetStreamOfQuads).mapConcat(identity).runFold(0L)((acc,_)=>acc+1)// Process the result of the stream (Future[Long]).map{counter=>println(s"[CLIENT] Received $counter quads.")}recover{casee=>println(s"[CLIENT] Failed to receive data: $e")}Await.ready(graphsFuture,10.seconds)// Finally, let's try to subscribe to a stream that the server does not support// We will request TRIPLES, but the server only supports QUADS and GRAPHS.println("\n\n[CLIENT] Subscribing to TRIPLES data from the server...")valtriplesFuture=client.subscribeRdf(RdfStreamSubscribe("weather",Some(JellyOptions.smallStrict.withPhysicalType(PhysicalStreamType.TRIPLES)))).via(DecoderFlow.decodeTriples.asFlatTripleStream).runFold(0L)((acc,_)=>acc+1).map{counter=>println(s"[CLIENT] Received $counter triples.")}recover{casee=>println(s"[CLIENT] Failed to receive triples: $e")}Await.result(triplesFuture,10.seconds)println("\n\n[CLIENT] Terminating...")system.terminate()println("[SERVER] Terminating...")serverActorSystem.terminate()/** * Example implementation of RdfStreamService to act as the server. * * You will also need to implement this trait in your own service. It defines the logic with which the server * will handle incoming streams and subscriptions. */classExampleJellyService(usingsystem:ActorSystem[_])extendsRdfStreamService:givenExecutionContext=system.executionContext/** * Handler for clients publishing RDF streams to the server. * * We receive a stream of RdfStreamFrames and must respond with an acknowledgment (or an error). */overridedefpublishRdf(in:Source[RdfStreamFrame,NotUsed]):Future[RdfStreamReceived]=// Decode the incoming stream and count the number of RDF statements in itin.via(DecoderFlow.decodeAny.asFlatStream).runFold(0L)((acc,_)=>acc+1).map(counter=>{println(s"[SERVER] Received ${counter} RDF statements. Sending acknowledgment.")// Send an acknowledgment back to the clientRdfStreamReceived()})/** * Handler for clients subscribing to RDF streams from the server. * * We receive a subscription request and must respond with a stream of RdfStreamFrames or an error. */overridedefsubscribeRdf(in:RdfStreamSubscribe):Source[RdfStreamFrame,NotUsed]=println(s"[SERVER] Received subscription request for topic ${in.topic}.")// First, check the requested physical stream typevalstreamType=in.requestedOptionsmatchcaseSome(options)=>println(s"[SERVER] Requested physical stream type: ${options.physicalType}.")options.physicalTypecaseNone=>println(s"[SERVER] No requested stream options.")PhysicalStreamType.UNSPECIFIED// Get the stream options requested by the client or the default options if none were providedvaloptions=in.requestedOptions.getOrElse(JellyOptions.smallStrict).withStreamName(in.topic)// Check if the requested options are supported// !!! THIS IS IMPORTANT !!!// If you don't check if the requested options are supported, you may be vulnerable to// denial-of-service attacks. For example, a client could request a very large lookup table// that would consume a lot of memory on the server.tryJellyOptions.checkCompatibility(options,JellyOptions.defaultSupportedOptions)catchcasee:IllegalArgumentException=>// If the requested options are not supported, return an errorreturnSource.failed(newGrpcServiceException(io.grpc.Status.INVALID_ARGUMENT.withDescription(e.getMessage)))streamTypematch// This server implementation only supports QUADS and GRAPHS streams... and in both cases// it will always the same dataset.// You can of course implement more complex logic here, e.g., to stream different data based on the topic.casePhysicalStreamType.QUADS=>RdfSource.builder.datasetAsQuads(dataset).source.via(EncoderFlow.builder.withLimiter(ByteSizeLimiter(16_000)).flatQuads(options).flow)casePhysicalStreamType.GRAPHS=>RdfSource.builder.datasetAsGraphs(dataset).source.via(EncoderFlow.builder.withLimiter(StreamRowCountLimiter(30)).namedGraphs(options).flow)// PhysicalStreamType.TRIPLES is not supported here – the server will throw a gRPC error// if the client requests it.// This is an example of how to properly handle unsupported stream options requested by the client.// The library is able to automatically convert the error into a gRPC status and send it back to the client.case_=>Source.failed(newGrpcServiceException(io.grpc.Status.INVALID_ARGUMENT.withDescription("Unsupported physical stream type")))
The classes provided in jelly-grpc should cover most cases, but they only serve as the boilerplate. You must yourself define the logic for handling the incoming and outgoing streams, as shown in the example above.
Of course, you can also implement the server or the client from scratch, if you want to.