Generic API
This guide explains how to use pyjelly’s generic API to write and read RDF statements into the Jelly format without any external library.
Installation
Install pyjelly from PyPI:
Requirements
- Python 3.9 or newer
- Linux, macOS, or Windows
Usage without external libraries
Unlike the example in getting started, the Generic API does not use the RDFLib or any other third-party libraries, but it works in much the same way.
Serializing statements to a Jelly file
To make a set of triples/quads and write them to a Jelly file, use:
from pyjelly.integrations.generic.generic_sink import *
# Create a generic sink object
generic_sink = GenericStatementSink()
# Let's add triples one by one
generic_sink.add(
Triple(
IRI("http://example.com/subject"),
IRI("http://example.com/predicate"),
Literal("Hello", langtag="en"),
)
)
generic_sink.add(
Triple(
BlankNode("B1"),
IRI("http://example.com/hasName"),
Literal("Bob"),
)
)
# Write into a Jelly file
with open("output.jelly", "wb") as out_file:
generic_sink.serialize(out_file)
print("All done.")
This example uses pyjelly’s simple custom triple/quad type, which is easy to create and work with.
Parsing statements from a Jelly file
To load triples/quads into your python object from a .jelly
file, see:
from pyjelly.integrations.generic.generic_sink import *
# Create a generic sink object
generic_sink = GenericStatementSink()
# Load triples from the Jelly file
with open("output.jelly", "rb") as in_file:
generic_sink.parse(in_file)
# Let's inspect them statement by statement
for statement in generic_sink:
if isinstance(statement, Triple):
print(statement)
print("All done.")
Which retrieves data from your .jelly
file.
Parsing a stream of graphs
Similarly, to process a Jelly stream as a stream of graphs through generic API, see:
import gzip
import urllib.request
from pyjelly.integrations.generic.parse import parse_jelly_grouped
# Dataset: Katrina weather measurements (10k graphs)
# Documentation: https://w3id.org/riverbench/datasets/lod-katrina/dev
url = "https://w3id.org/riverbench/datasets/lod-katrina/dev/files/jelly_10K.jelly.gz"
# Load, uncompress .gz file, and pass to Jelly parser, all in a streaming manner
with (
urllib.request.urlopen(url) as response,
gzip.open(response) as jelly_stream,
):
# Parse into sinks (one per graph)
graphs = parse_jelly_grouped(jelly_stream)
# First 50
for i, graph in enumerate(graphs):
print(f"Graph {i} in the stream has {len(graph)} triples")
if i >= 50:
break
Where we use a dataset of weather measurements and count the number of triples in each graph.
Parsing a stream of statements
You can also process a Jelly stream as a flat stream with only generic API:
We look through a fragment of Denmark's OpenStreetMap to find all city names:
import gzip
import urllib.request
from pyjelly.integrations.generic.generic_sink import *
from pyjelly.integrations.generic.parse import parse_jelly_flat
# Dataset: OpenStreetMap data for Denmark (first 10k objects)
# Documentation: https://w3id.org/riverbench/datasets/osm2rdf-denmark/dev
url = (
"https://w3id.org/riverbench/datasets/osm2rdf-denmark/dev/files/jelly_10K.jelly.gz"
)
# We are looking for city names in the dataset
predicate_to_look_for = IRI("https://www.openstreetmap.org/wiki/Key:addr:city")
city_names = set()
with (
urllib.request.urlopen(url) as response,
gzip.open(response) as jelly_stream,
):
for event in parse_jelly_flat(jelly_stream):
if isinstance(event, Triple):
if str(event.p) == str(predicate_to_look_for):
city_names.add(str(event.o))
print(f"Found {len(city_names)} unique city names in the dataset.")
print("Sample city names:")
for city in list(city_names)[:10]:
print(f"- {city}")
We get a generator of stream events, which allows us to process the file statement-by-statement, however with no external libraries used.
Streaming data
If you need to process a certain quantity of statements both efficiently and iteratively, you can provide a simple generator:
from pyjelly.integrations.generic.generic_sink import *
# Helper generator that streams statements from a Jelly file path
def stream_triples(jelly_path):
generic_sink = GenericStatementSink()
with open(jelly_path, "rb") as f:
generic_sink.parse(f)
yield from (stmt for stmt in generic_sink)
# Example usage, just printing:
for triple in stream_triples("output.jelly"):
print(triple)
print("All done.")
With this method you avoid storing all statements in memory, which greatly improves performance.
Serializing a stream of graphs
If you have a generator object containing graphs, you can use a generic approach for serialization:
from pyjelly.integrations.generic.generic_sink import *
from pyjelly.integrations.generic.serialize import grouped_stream_to_file
import random
# Helper function to generate a generator of graphs
def generate_sample_sinks():
content = (
IRI("http://example.com/sensor"),
IRI("http://example.com/humidity"),
IRI(f"http://example.com/{random.random()}"),
)
for _ in range(10):
sink = GenericStatementSink()
sink.add(Triple(*content))
yield sink
output_file = "output.jelly"
print(f"Streaming graphs into {output_file}…")
with open(output_file, "wb") as out_f:
grouped_stream_to_file(generate_sample_sinks(), out_f)
print("All done.")
Grouped data is streamed in its original form, no need for additional RDF libraries like RDFLib.
Serializing a stream of statements
Serializing a generator object of statements to .jelly
file through generic API:
from pyjelly.integrations.generic.serialize import flat_stream_to_file
from pyjelly.integrations.generic.generic_sink import *
import random
# Example generator that yields raw triples
def generate_sample_triples():
content = (
IRI("http://example.com/sensor"),
IRI("http://example.com/humidity"),
IRI(f"http://example.com/{random.random()}"),
)
for _ in range(10):
yield Triple(*content)
output_file = "flat_output.jelly"
print(f"Streaming triples into {output_file}…")
sample_triples = generate_sample_triples()
with open(output_file, "wb") as out_f:
flat_stream_to_file(sample_triples, out_f)
print("All done.")
Data is transmitted and kept ordered and simple.
Working with byte buffers and Kafka
When working with Kafka or other message brokers, you may want to write Jelly data to a byte buffer instead of a file. You can do this by using the BytesIO
class from the io
module:
import io
from pyjelly.integrations.generic.generic_sink import *
g1 = GenericStatementSink()
g1.add(
Triple(
IRI("http://example.com/subject"),
IRI("http://example.com/predicate"),
Literal("Hello", langtag="en"),
)
)
# Write the data into a byte buffer (bytes type)
with io.BytesIO() as write_buffer:
g1.serialize(write_buffer)
data = write_buffer.getvalue()
print(f"Serialized data size: {len(data)} bytes")
# Parse the data back
g2 = GenericStatementSink()
with io.BytesIO(data) as read_buffer:
g2.parse(read_buffer)
print("\nParsed triples:")
for statement in g2:
print(statement)
The data
variable is of type bytes
, and can be passed to Kafka with KafkaProducer.send(value=data)
, or any other API that accepts byte buffers. Same trick may be used when working with the RDFLib integration.
When working with Kafka, you should be aware of the broker's offset management and partitioning strategies. Data within one Jelly stream must be strictly ordered and no frames may be dropped. If you have less strict ordering guarantees, you should split up the stream into multiple Jelly streams, each with guaranteed consistency.
See also
If you are familiar with RDFLib, you can use pyjelly together with RDFLib in a similar way. See the dedicated guide.