Skip to content

API reference

pyjelly

Modules:

Name Description
errors
integrations
jelly
options
parse
serialize

errors

Classes:

Name Description
JellyConformanceError

Raised when Jelly conformance is violated.

JellyAssertionError

Raised when a recommended assertion from the specification fails.

JellyNotImplementedError

Raised when a future feature is not yet implemented.

JellyConformanceError

Bases: Exception

Raised when Jelly conformance is violated.

JellyAssertionError

Bases: AssertionError

Raised when a recommended assertion from the specification fails.

JellyNotImplementedError

Bases: NotImplementedError

Raised when a future feature is not yet implemented.

integrations

Modules:

Name Description
rdflib
rdflib

Modules:

Name Description
parse
serialize

Functions:

Name Description
register_extension_to_rdflib

Make rdflib.util.guess_format discover Jelly format.

register_extension_to_rdflib(extension='.jelly')

Make rdflib.util.guess_format discover Jelly format.

rdflib.util.guess_format("foo.jelly") register_extension_to_rdflib() rdflib.util.guess_format("foo.jelly") 'jelly'

Source code in pyjelly/integrations/rdflib/__init__.py
def register_extension_to_rdflib(extension: str = ".jelly") -> None:
    """
    Make [rdflib.util.guess_format][] discover Jelly format.

    >>> rdflib.util.guess_format("foo.jelly")
    >>> register_extension_to_rdflib()
    >>> rdflib.util.guess_format("foo.jelly")
    'jelly'
    """
    rdflib.util.SUFFIX_FORMAT_MAP[extension.removeprefix(".")] = "jelly"
parse

Classes:

Name Description
RDFLibAdapter

RDFLib adapter class, is extended by triples and quads implementations.

RDFLibTriplesAdapter

Triples adapter RDFLib implementation.

RDFLibQuadsAdapter

Extended RDFLib adapter for the QUADS physical type.

RDFLibGraphsAdapter

Extension of RDFLibQuadsBaseAdapter for the GRAPHS physical type.

RDFLibJellyParser

Functions:

Name Description
parse_triples_stream

Parse flat triple stream.

parse_quads_stream

Parse flat quads stream.

parse_jelly_grouped

Take jelly file and return generators based on the detected logical type.

parse_jelly_flat

Parse jelly file with FLAT physical type into one Graph/Dataset.

RDFLibAdapter(options, parsing_mode=ParsingMode.FLAT)

Bases: Adapter

RDFLib adapter class, is extended by triples and quads implementations.

Args: Adapter (type): abstract adapter class

Source code in pyjelly/parse/decode.py
def __init__(
    self, options: ParserOptions, parsing_mode: ParsingMode = ParsingMode.FLAT
) -> None:
    self.options = options
    self.parsing_mode = parsing_mode
RDFLibTriplesAdapter(options, graph_factory, parsing_mode=ParsingMode.FLAT)

Bases: RDFLibAdapter

Triples adapter RDFLib implementation.

Notes: has internal graph object which tracks triples and namespaces and can get flushed between frames.

Methods:

Name Description
frame

Finalize one frame in triples stream.

Source code in pyjelly/integrations/rdflib/parse.py
def __init__(
    self,
    options: ParserOptions,
    graph_factory: Callable[[], Graph],
    parsing_mode: ParsingMode = ParsingMode.FLAT,
) -> None:
    super().__init__(options=options, parsing_mode=parsing_mode)
    self.graph = graph_factory()
    self.graph_factory = graph_factory
    self.parsing_mode = parsing_mode
frame()

Finalize one frame in triples stream.

Returns: Graph: frame content as a separate Graph and starts a new Graph

Source code in pyjelly/integrations/rdflib/parse.py
def frame(self) -> Graph:
    """
    Finalize one frame in triples stream.

    Returns:
       Graph: frame content as a separate Graph
            and starts a new Graph

    """
    this_graph = self.graph
    self.graph = self.graph_factory()
    return this_graph
RDFLibQuadsAdapter(options, dataset_factory, parsing_mode=ParsingMode.FLAT)

Bases: RDFLibQuadsBaseAdapter

Extended RDFLib adapter for the QUADS physical type.

Notes: Adds triples and namespaces directly to dataset, so RDFLib handles the rest.

Args: RDFLibQuadsBaseAdapter (type): base quads adapter (shared with graphs physical type)

Source code in pyjelly/integrations/rdflib/parse.py
def __init__(
    self,
    options: ParserOptions,
    dataset_factory: Callable[[], Dataset],
    parsing_mode: ParsingMode = ParsingMode.FLAT,
) -> None:
    super().__init__(options=options, parsing_mode=parsing_mode)
    self.dataset = dataset_factory()
    self.dataset_factory = dataset_factory
RDFLibGraphsAdapter(options, dataset_factory, parsing_mode=ParsingMode.FLAT)

Bases: RDFLibQuadsBaseAdapter

Extension of RDFLibQuadsBaseAdapter for the GRAPHS physical type.

Notes: introduces graph start/end, checks if graph exists, dataset store management.

Args: RDFLibQuadsBaseAdapter (type): base adapter for quads management.

Raises: JellyConformanceError: if no graph_start was encountered

Source code in pyjelly/integrations/rdflib/parse.py
def __init__(
    self,
    options: ParserOptions,
    dataset_factory: Callable[[], Dataset],
    parsing_mode: ParsingMode = ParsingMode.FLAT,
) -> None:
    super().__init__(
        options=options,
        dataset_factory=dataset_factory,
        parsing_mode=parsing_mode,
    )
    self._graph_id = None
RDFLibJellyParser

Bases: Parser

Methods:

Name Description
parse

Parse jelly file into provided RDFLib Graph.

parse(source, sink)

Parse jelly file into provided RDFLib Graph.

Args: source (InputSource): jelly file as buffered binary stream InputSource obj sink (Graph): RDFLib Graph

Raises: TypeError: raises error if invalid input

Source code in pyjelly/integrations/rdflib/parse.py
def parse(self, source: InputSource, sink: Graph) -> None:
    """
    Parse jelly file into provided RDFLib Graph.

    Args:
        source (InputSource): jelly file as buffered binary stream InputSource obj
        sink (Graph): RDFLib Graph

    Raises:
        TypeError: raises error if invalid input

    """
    inp = source.getByteStream()  # type: ignore[no-untyped-call]
    if inp is None:
        msg = "expected source to be a stream of bytes"
        raise TypeError(msg)
    parse_jelly_flat(
        inp,
        graph_factory=lambda: Graph(store=sink.store, identifier=sink.identifier),
        dataset_factory=lambda: Dataset(store=sink.store),
    )
parse_triples_stream(frames, options, graph_factory, parsing_mode=ParsingMode.FLAT)

Parse flat triple stream.

Args: frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames options (ParserOptions): stream options graph_factory (Callable): Lambda to construct a graph parsing_mode (ParsingMode): specifies whether this is a flat or grouped parsing.

Yields: Generator[Graph]: RDFLib Graph(s)

Source code in pyjelly/integrations/rdflib/parse.py
def parse_triples_stream(
    frames: Iterable[jelly.RdfStreamFrame],
    options: ParserOptions,
    graph_factory: Callable[[], Graph],
    parsing_mode: ParsingMode = ParsingMode.FLAT,
) -> Generator[Graph]:
    """
    Parse flat triple stream.

    Args:
        frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames
        options (ParserOptions): stream options
        graph_factory (Callable): Lambda to construct a graph
        parsing_mode (ParsingMode): specifies whether this is
            a flat or grouped parsing.

    Yields:
        Generator[Graph]: RDFLib Graph(s)

    """
    adapter = RDFLibTriplesAdapter(
        options, graph_factory=graph_factory, parsing_mode=parsing_mode
    )
    decoder = Decoder(adapter=adapter)
    for frame in frames:
        g = decoder.decode_frame(frame)
        if g is not None:
            yield g

    if parsing_mode is ParsingMode.FLAT:
        yield adapter.graph
parse_quads_stream(frames, options, dataset_factory, parsing_mode=ParsingMode.FLAT)

Parse flat quads stream.

Args: frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames options (ParserOptions): stream options dataset_factory (Callable): Lambda to construct a dataset parsing_mode (ParsingMode): specifies whether this is a flat or grouped parsing.

Yields: Generator[Dataset]: RDFLib dataset(s)

Source code in pyjelly/integrations/rdflib/parse.py
def parse_quads_stream(
    frames: Iterable[jelly.RdfStreamFrame],
    options: ParserOptions,
    dataset_factory: Callable[[], Dataset],
    parsing_mode: ParsingMode = ParsingMode.FLAT,
) -> Generator[Dataset]:
    """
    Parse flat quads stream.

    Args:
        frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames
        options (ParserOptions): stream options
        dataset_factory (Callable): Lambda to construct a dataset
        parsing_mode (ParsingMode): specifies whether this is
            a flat or grouped parsing.

    Yields:
        Generator[Dataset]: RDFLib dataset(s)

    """
    adapter_class: type[RDFLibQuadsBaseAdapter]
    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_QUADS:
        adapter_class = RDFLibQuadsAdapter
    else:
        adapter_class = RDFLibGraphsAdapter
    adapter = adapter_class(
        options=options,
        dataset_factory=dataset_factory,
        parsing_mode=parsing_mode,
    )
    decoder = Decoder(adapter=adapter)
    for frame in frames:
        ds = decoder.decode_frame(frame)
        if ds is not None:
            yield ds

    if parsing_mode is ParsingMode.FLAT:
        yield adapter.dataset
parse_jelly_grouped(inp, graph_factory, dataset_factory)

Take jelly file and return generators based on the detected logical type.

Yields one graph/dataset per frame.

Args: inp (IO[bytes]): input jelly buffered binary stream graph_factory (Callable): lambda to construct a Graph dataset_factory (Callable): lambda to construct a Dataset

Raises: NotImplementedError: is raised if a logical type is not implemented

Yields: Generator[Any] | Generator[Dataset] | Generator[Graph]: returns generators for graphs/datasets based on the type of input

Source code in pyjelly/integrations/rdflib/parse.py
def parse_jelly_grouped(
    inp: IO[bytes],
    graph_factory: Callable[[], Graph],
    dataset_factory: Callable[[], Dataset],
) -> Generator[Any] | Generator[Graph] | Generator[Dataset]:
    """
    Take jelly file and return generators based on the detected logical type.

    Yields one graph/dataset per frame.

    Args:
        inp (IO[bytes]): input jelly buffered binary stream
        graph_factory (Callable): lambda to construct a Graph
        dataset_factory (Callable): lambda to construct a Dataset

    Raises:
        NotImplementedError: is raised if a logical type is not implemented

    Yields:
        Generator[Any] | Generator[Dataset] | Generator[Graph]:
            returns generators for graphs/datasets based on the type of input

    """
    options, frames = get_options_and_frames(inp)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        yield from parse_triples_stream(
            frames=frames,
            options=options,
            graph_factory=graph_factory,
            parsing_mode=ParsingMode.GROUPED,
        )
        return

    if options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        yield from parse_quads_stream(
            frames=frames,
            options=options,
            dataset_factory=dataset_factory,
            parsing_mode=ParsingMode.GROUPED,
        )
        return

    physical_type_name = jelly.PhysicalStreamType.Name(
        options.stream_types.physical_type
    )
    msg = f"the stream type {physical_type_name} is not supported "
    raise NotImplementedError(msg)
parse_jelly_flat(inp, graph_factory, dataset_factory)

Parse jelly file with FLAT physical type into one Graph/Dataset.

Args: inp (IO[bytes]): input jelly buffered binary stream graph_factory (Callable): lambda to construct a Graph dataset_factory (Callable): lambda to construct a Dataset

Raises: NotImplementedError: if physical type is not supported

Returns: RDFLib Graph or Dataset

Source code in pyjelly/integrations/rdflib/parse.py
def parse_jelly_flat(
    inp: IO[bytes],
    graph_factory: Callable[[], Graph],
    dataset_factory: Callable[[], Dataset],
) -> Any | Dataset | Graph:
    """
    Parse jelly file with FLAT physical type into one Graph/Dataset.

    Args:
        inp (IO[bytes]): input jelly buffered binary stream
        graph_factory (Callable): lambda to construct a Graph
        dataset_factory (Callable): lambda to construct a Dataset

    Raises:
        NotImplementedError: if physical type is not supported

    Returns:
        RDFLib Graph or Dataset

    """
    options, frames = get_options_and_frames(inp)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        return next(
            parse_triples_stream(
                frames=frames,
                options=options,
                graph_factory=graph_factory,
                parsing_mode=ParsingMode.FLAT,
            )
        )

    if options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        return next(
            parse_quads_stream(
                frames=frames,
                options=options,
                dataset_factory=dataset_factory,
                parsing_mode=ParsingMode.FLAT,
            )
        )
    physical_type_name = jelly.PhysicalStreamType.Name(
        options.stream_types.physical_type
    )
    msg = f"the stream type {physical_type_name} is not supported "
    raise NotImplementedError(msg)
serialize

Classes:

Name Description
RDFLibTermEncoder
RDFLibJellySerializer

RDFLib serializer for writing graphs in Jelly RDF stream format.

Functions:

Name Description
triples_stream_frames

Serialize a Graph/Dataset into jelly frames.

quads_stream_frames

Serialize a Dataset into jelly frames.

graphs_stream_frames

Serialize a Dataset into jelly frames as a stream of graphs.

RDFLibTermEncoder(lookup_preset=None)

Bases: TermEncoder

Methods:

Name Description
encode_any

Encode term based on its RDFLib object.

Source code in pyjelly/serialize/encode.py
def __init__(
    self,
    lookup_preset: options.LookupPreset | None = None,
) -> None:
    if lookup_preset is None:
        lookup_preset = options.LookupPreset()
    self.lookup_preset = lookup_preset
    self.names = LookupEncoder(lookup_size=lookup_preset.max_names)
    self.prefixes = LookupEncoder(lookup_size=lookup_preset.max_prefixes)
    self.datatypes = LookupEncoder(lookup_size=lookup_preset.max_datatypes)
encode_any(term, slot)

Encode term based on its RDFLib object.

Args: term (object): term to encode slot (Slot): its place in statement.

Returns: RowsAndTerm: encoded extra rows and a jelly term to encode

Source code in pyjelly/integrations/rdflib/serialize.py
def encode_any(self, term: object, slot: Slot) -> RowsAndTerm:
    """
    Encode term based on its RDFLib object.

    Args:
        term (object): term to encode
        slot (Slot): its place in statement.

    Returns:
        RowsAndTerm: encoded extra rows and a jelly term to encode

    """
    if slot is Slot.graph and term == DATASET_DEFAULT_GRAPH_ID:
        return self.encode_default_graph()

    if isinstance(term, rdflib.URIRef):
        return self.encode_iri(term)

    if isinstance(term, rdflib.Literal):
        return self.encode_literal(
            lex=str(term),
            language=term.language,
            # `datatype` is cast to `str` explicitly because
            # `URIRef.__eq__` overrides `str.__eq__` in an incompatible manner
            datatype=term.datatype and str(term.datatype),
        )

    if isinstance(term, rdflib.BNode):
        return self.encode_bnode(str(term))

    return super().encode_any(term, slot)  # error if not handled
RDFLibJellySerializer(store)

Bases: Serializer

RDFLib serializer for writing graphs in Jelly RDF stream format.

Handles streaming RDF terms into Jelly frames using internal encoders. Supports only graphs and datasets (not quoted graphs).

Methods:

Name Description
guess_options

Guess the serializer options based on the store type.

guess_stream

Return an appropriate stream implementation for the given options.

serialize

Serialize self.store content to Jelly format.

Source code in pyjelly/integrations/rdflib/serialize.py
def __init__(self, store: Graph) -> None:
    if isinstance(store, QuotedGraph):
        msg = "N3 format is not supported"
        raise NotImplementedError(msg)
    super().__init__(store)
guess_options()

Guess the serializer options based on the store type.

RDFLibJellySerializer(Graph()).guess_options().logical_type 1 RDFLibJellySerializer(Dataset()).guess_options().logical_type 2

Source code in pyjelly/integrations/rdflib/serialize.py
def guess_options(self) -> SerializerOptions:
    """
    Guess the serializer options based on the store type.

    >>> RDFLibJellySerializer(Graph()).guess_options().logical_type
    1
    >>> RDFLibJellySerializer(Dataset()).guess_options().logical_type
    2
    """
    logical_type = (
        jelly.LOGICAL_STREAM_TYPE_FLAT_QUADS
        if isinstance(self.store, Dataset)
        else jelly.LOGICAL_STREAM_TYPE_FLAT_TRIPLES
    )
    return SerializerOptions(logical_type=logical_type)
guess_stream(options)

Return an appropriate stream implementation for the given options.

graph_ser = RDFLibJellySerializer(Graph()) ds_ser = RDFLibJellySerializer(Dataset())

type(graph_ser.guess_stream(graph_ser.guess_options())) type(ds_ser.guess_stream(ds_ser.guess_options()))

Source code in pyjelly/integrations/rdflib/serialize.py
def guess_stream(self, options: SerializerOptions) -> Stream:
    """
    Return an appropriate stream implementation for the given options.

    >>> graph_ser = RDFLibJellySerializer(Graph())
    >>> ds_ser = RDFLibJellySerializer(Dataset())

    >>> type(graph_ser.guess_stream(graph_ser.guess_options()))
    <class 'pyjelly.serialize.streams.TripleStream'>
    >>> type(ds_ser.guess_stream(ds_ser.guess_options()))
    <class 'pyjelly.serialize.streams.QuadStream'>
    """
    stream_cls: type[Stream]
    if options.logical_type != jelly.LOGICAL_STREAM_TYPE_GRAPHS and isinstance(
        self.store, Dataset
    ):
        stream_cls = QuadStream
    else:
        stream_cls = TripleStream
    return stream_cls.for_rdflib(options=options)
serialize(out, /, *, stream=None, options=None, **unused)

Serialize self.store content to Jelly format.

Args: out (IO[bytes]): output buffered writer stream (Stream | None, optional): Jelly stream object. Defaults to None. options (SerializerOptions | None, optional): Serializer options if defined beforehand, e.g., read from a separate file. Defaults to None. **unused(Any): unused args for RDFLib serialize

Source code in pyjelly/integrations/rdflib/serialize.py
@override
def serialize(  # type: ignore[override]
    self,
    out: IO[bytes],
    /,
    *,
    stream: Stream | None = None,
    options: SerializerOptions | None = None,
    **unused: Any,
) -> None:
    """
    Serialize self.store content to Jelly format.

    Args:
        out (IO[bytes]): output buffered writer
        stream (Stream | None, optional): Jelly stream object. Defaults to None.
        options (SerializerOptions | None, optional): Serializer options
            if defined beforehand, e.g., read from a separate file.
            Defaults to None.
        **unused(Any): unused args for RDFLib serialize

    """
    if options is None:
        options = self.guess_options()
    if stream is None:
        stream = self.guess_stream(options)
    write = write_delimited if stream.options.params.delimited else write_single
    for stream_frame in stream_frames(stream, self.store):
        write(stream_frame, out)
triples_stream_frames(stream, data)

Serialize a Graph/Dataset into jelly frames.

Args: stream (TripleStream): stream that specifies triples processing data (Graph | Dataset): Graph/Dataset to serialize.

Notes: if Dataset is given, its graphs are unpacked and iterated over if flow is GraphsFrameFlow, emits a frame per graph.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames.

Source code in pyjelly/integrations/rdflib/serialize.py
@stream_frames.register(TripleStream)
def triples_stream_frames(
    stream: TripleStream,
    data: Graph | Dataset,
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a Graph/Dataset into jelly frames.

    Args:
        stream (TripleStream): stream that specifies triples processing
        data (Graph | Dataset): Graph/Dataset to serialize.

    Notes:
        if Dataset is given, its graphs are unpacked and iterated over
        if flow is GraphsFrameFlow, emits a frame per graph.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames.

    """
    stream.enroll()
    if stream.options.params.namespace_declarations:
        namespace_declarations(data, stream)
    graphs = (data,) if not isinstance(data, Dataset) else data.graphs()
    for graph in graphs:
        for terms in graph:
            if frame := stream.triple(terms):
                yield frame
        # this part turns each graph to a frame for graphs logical type
        if frame := stream.flow.frame_from_graph():
            yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame
quads_stream_frames(stream, data)

Serialize a Dataset into jelly frames.

Notes: Emits one frame per dataset if flow is of DatasetsFrameFlow.

Args: stream (QuadStream): stream that specifies quads processing data (Dataset): Dataset to serialize.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames

Source code in pyjelly/integrations/rdflib/serialize.py
@stream_frames.register
def quads_stream_frames(
    stream: QuadStream,
    data: Dataset,
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a Dataset into jelly frames.

    Notes:
        Emits one frame per dataset if flow is of DatasetsFrameFlow.

    Args:
        stream (QuadStream): stream that specifies quads processing
        data (Dataset): Dataset to serialize.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames

    """
    assert isinstance(data, Dataset)
    stream.enroll()
    if stream.options.params.namespace_declarations:
        namespace_declarations(data, stream)
    for terms in data.quads():
        if frame := stream.quad(terms):
            yield frame
    if frame := stream.flow.frame_from_dataset():
        yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame
graphs_stream_frames(stream, data)

Serialize a Dataset into jelly frames as a stream of graphs.

Notes: If flow of DatasetsFrameFlow type, the whole dataset will be encoded into one frame.

Args: stream (GraphStream): stream that specifies graphs processing data (Dataset): Dataset to serialize.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames

Source code in pyjelly/integrations/rdflib/serialize.py
@stream_frames.register
def graphs_stream_frames(
    stream: GraphStream,
    data: Dataset,
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a Dataset into jelly frames as a stream of graphs.

    Notes:
        If flow of DatasetsFrameFlow type, the whole dataset
        will be encoded into one frame.

    Args:
        stream (GraphStream): stream that specifies graphs processing
        data (Dataset): Dataset to serialize.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames

    """
    assert isinstance(data, Dataset)
    stream.enroll()
    if stream.options.params.namespace_declarations:
        namespace_declarations(data, stream)
    for graph in data.graphs():
        yield from stream.graph(graph_id=graph.identifier, graph=graph)
    if frame := stream.flow.frame_from_dataset():
        yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame

jelly

Modules:

Name Description
rdf_pb2

Generated protocol buffer code.

rdf_pb2

Generated protocol buffer code.

options

Functions:

Name Description
register_mimetypes

Associate files that have Jelly extension with Jelly MIME types.

Attributes:

Name Type Description
INTEGRATION_SIDE_EFFECTS bool

Whether to allow integration module imports to trigger side effects.

INTEGRATION_SIDE_EFFECTS = True

Whether to allow integration module imports to trigger side effects.

These side effects are cheap and may include populating some registries for guessing the defaults for external integrations that work with Jelly.

register_mimetypes(extension='.jelly')

Associate files that have Jelly extension with Jelly MIME types.

register_mimetypes() mimetypes.guess_type("out.jelly") ('application/x-jelly-rdf', None)

Source code in pyjelly/options.py
def register_mimetypes(extension: str = ".jelly") -> None:
    """
    Associate files that have Jelly extension with Jelly MIME types.

    >>> register_mimetypes()
    >>> mimetypes.guess_type("out.jelly")
    ('application/x-jelly-rdf', None)
    """
    for mimetype in MIMETYPES:
        mimetypes.add_type(mimetype, extension)

parse

Modules:

Name Description
decode
ioutils
lookup
decode

Classes:

Name Description
ParsingMode

Specifies how jelly frames should be treated.

Decoder

Functions:

Name Description
options_from_frame

Fill stream options based on the options row.

ParsingMode

Bases: Enum

Specifies how jelly frames should be treated.

Modes: FLAT Yield all frames as one Graph or Dataset. GROUPED Yield one Graph/Dataset per frame (grouped parsing).

Decoder(adapter)

Initializes decoder with a lookup tables with preset sizes, integration-dependent adapter and empty repeated terms dictionary.

Args: adapter (Adapter): integration-dependent adapter that specifies terms conversion to specific objects, framing, namespace declarations, and graphs/datasets forming.

Methods:

Name Description
decode_frame

Decode a frame to custom object based on adapter implementation.

decode_row

Decode a row based on its type.

ingest_prefix_entry

Update prefix lookup table based on the table entry.

ingest_name_entry

Update name lookup table based on the table entry.

ingest_datatype_entry

Update datatype lookup table based on the table entry.

decode_term

Decode a term based on its type: IRI/literal/BN/default graph.

decode_iri

Decode RdfIri message to IRI using a custom adapter.

decode_bnode

Decode string message to blank node (BN) using a custom adapter.

decode_literal

Decode RdfLiteral to literal based on custom adapter implementation.

decode_statement

Decode a triple/quad message.

Source code in pyjelly/parse/decode.py
def __init__(self, adapter: Adapter) -> None:
    """
    Initialize decoder.

    Initializes decoder with a lookup tables with preset sizes,
    integration-dependent adapter and empty repeated terms dictionary.

    Args:
        adapter (Adapter): integration-dependent adapter that specifies terms
        conversion to specific objects, framing,
        namespace declarations, and graphs/datasets forming.

    """
    self.adapter = adapter
    self.names = LookupDecoder(lookup_size=self.options.lookup_preset.max_names)
    self.prefixes = LookupDecoder(
        lookup_size=self.options.lookup_preset.max_prefixes
    )
    self.datatypes = LookupDecoder(
        lookup_size=self.options.lookup_preset.max_datatypes
    )
    self.repeated_terms: dict[str, jelly.RdfIri | str | jelly.RdfLiteral] = {}
decode_frame(frame)

Decode a frame to custom object based on adapter implementation.

Args: frame (jelly.RdfStreamFrame): jelly frame

Returns: Any: custom obj based on adapter logic

Source code in pyjelly/parse/decode.py
def decode_frame(self, frame: jelly.RdfStreamFrame) -> Any:
    """
    Decode a frame to custom object based on adapter implementation.

    Args:
        frame (jelly.RdfStreamFrame): jelly frame

    Returns:
        Any: custom obj based on adapter logic

    """
    for row_owner in frame.rows:
        row = getattr(row_owner, row_owner.WhichOneof("row"))
        self.decode_row(row)
    if self.adapter.parsing_mode is ParsingMode.GROUPED:
        return self.adapter.frame()
    return None
decode_row(row)

Decode a row based on its type.

Notes: uses custom adapters to decode triples/quads, namespace declarations, graph start/end.

Args: row (Any): protobuf row message

Raises: TypeError: raises error if this type of protobuf message does not have a respective handler

Returns: Any | None: decoded row - result from calling decode_row (row type appropriate handler)

Source code in pyjelly/parse/decode.py
def decode_row(self, row: Any) -> Any | None:
    """
    Decode a row based on its type.

    Notes: uses custom adapters to decode triples/quads, namespace declarations,
           graph start/end.

    Args:
        row (Any): protobuf row message

    Raises:
        TypeError: raises error if this type of protobuf message does not have
                   a respective handler

    Returns:
        Any | None: decoded row -
                    result from calling decode_row (row type appropriate handler)

    """
    try:
        decode_row = self.row_handlers[type(row)]
    except KeyError:
        msg = f"decoder not implemented for {type(row)}"
        raise TypeError(msg) from None
    return decode_row(self, row)
ingest_prefix_entry(entry)

Update prefix lookup table based on the table entry.

Args: entry (jelly.RdfPrefixEntry): prefix message, containing id and value

Source code in pyjelly/parse/decode.py
def ingest_prefix_entry(self, entry: jelly.RdfPrefixEntry) -> None:
    """
    Update prefix lookup table based on the table entry.

    Args:
        entry (jelly.RdfPrefixEntry): prefix message, containing id and value

    """
    self.prefixes.assign_entry(index=entry.id, value=entry.value)
ingest_name_entry(entry)

Update name lookup table based on the table entry.

Args: entry (jelly.RdfNameEntry): name message, containing id and value

Source code in pyjelly/parse/decode.py
def ingest_name_entry(self, entry: jelly.RdfNameEntry) -> None:
    """
    Update name lookup table based on the table entry.

    Args:
        entry (jelly.RdfNameEntry): name message, containing id and value

    """
    self.names.assign_entry(index=entry.id, value=entry.value)
ingest_datatype_entry(entry)

Update datatype lookup table based on the table entry.

Args: entry (jelly.RdfDatatypeEntry): name message, containing id and value

Source code in pyjelly/parse/decode.py
def ingest_datatype_entry(self, entry: jelly.RdfDatatypeEntry) -> None:
    """
    Update datatype lookup table based on the table entry.

    Args:
        entry (jelly.RdfDatatypeEntry): name message, containing id and value

    """
    self.datatypes.assign_entry(index=entry.id, value=entry.value)
decode_term(term)

Decode a term based on its type: IRI/literal/BN/default graph.

Notes: requires a custom adapter with implemented methods for terms decoding.

Args: term (Any): IRI/literal/BN(string)/Default graph message

Raises: TypeError: raises error if no handler for the term is found

Returns: Any: decoded term (currently, rdflib objects, e.g., rdflib.term.URIRef)

Source code in pyjelly/parse/decode.py
def decode_term(self, term: Any) -> Any:
    """
    Decode a term based on its type: IRI/literal/BN/default graph.

    Notes: requires a custom adapter with implemented methods for terms decoding.

    Args:
        term (Any): IRI/literal/BN(string)/Default graph message

    Raises:
        TypeError: raises error if no handler for the term is found

    Returns:
        Any: decoded term (currently, rdflib objects, e.g., rdflib.term.URIRef)

    """
    try:
        decode_term = self.term_handlers[type(term)]
    except KeyError:
        msg = f"decoder not implemented for {type(term)}"
        raise TypeError(msg) from None
    return decode_term(self, term)
decode_iri(iri)

Decode RdfIri message to IRI using a custom adapter.

Args: iri (jelly.RdfIri): RdfIri message

Returns: Any: IRI, based on adapter implementation, e.g., rdflib.term.URIRef

Source code in pyjelly/parse/decode.py
def decode_iri(self, iri: jelly.RdfIri) -> Any:
    """
    Decode RdfIri message to IRI using a custom adapter.

    Args:
        iri (jelly.RdfIri): RdfIri message

    Returns:
        Any: IRI, based on adapter implementation, e.g., rdflib.term.URIRef

    """
    name = self.names.decode_name_term_index(iri.name_id)
    prefix = self.prefixes.decode_prefix_term_index(iri.prefix_id)
    return self.adapter.iri(iri=prefix + name)
decode_bnode(bnode)

Decode string message to blank node (BN) using a custom adapter.

Args: bnode (str): blank node id

Returns: Any: blank node object from the custom adapter

Source code in pyjelly/parse/decode.py
def decode_bnode(self, bnode: str) -> Any:
    """
    Decode string message to blank node (BN) using a custom adapter.

    Args:
        bnode (str): blank node id

    Returns:
        Any: blank node object from the custom adapter

    """
    return self.adapter.bnode(bnode)
decode_literal(literal)

Decode RdfLiteral to literal based on custom adapter implementation.

Notes: checks for langtag existence; for datatype checks for non-zero table size and datatype field presence

Args: literal (jelly.RdfLiteral): RdfLiteral message

Returns: Any: literal returned by the custom adapter

Source code in pyjelly/parse/decode.py
def decode_literal(self, literal: jelly.RdfLiteral) -> Any:
    """
    Decode RdfLiteral to literal based on custom adapter implementation.

    Notes: checks for langtag existence;
           for datatype checks for non-zero table size and datatype field presence

    Args:
        literal (jelly.RdfLiteral): RdfLiteral message

    Returns:
        Any: literal returned by the custom adapter

    """
    language = datatype = None
    if literal.langtag:
        language = literal.langtag
    elif self.datatypes.lookup_size and literal.HasField("datatype"):
        datatype = self.datatypes.decode_datatype_term_index(literal.datatype)
    return self.adapter.literal(
        lex=literal.lex,
        language=language,
        datatype=datatype,
    )
decode_statement(statement, oneofs)

Decode a triple/quad message.

Notes: also updates repeated terms dictionary

Args: statement (jelly.RdfTriple | jelly.RdfQuad): triple/quad message oneofs (Sequence[str]): terms s/p/o/g(if quads)

Raises: ValueError: if a missing repeated term is encountered

Returns: Any: a list of decoded terms

Source code in pyjelly/parse/decode.py
def decode_statement(
    self,
    statement: jelly.RdfTriple | jelly.RdfQuad,
    oneofs: Sequence[str],
) -> Any:
    """
    Decode a triple/quad message.

    Notes: also updates repeated terms dictionary

    Args:
        statement (jelly.RdfTriple | jelly.RdfQuad): triple/quad message
        oneofs (Sequence[str]): terms s/p/o/g(if quads)

    Raises:
        ValueError: if a missing repeated term is encountered

    Returns:
        Any: a list of decoded terms

    """
    terms = []
    for oneof in oneofs:
        field = statement.WhichOneof(oneof)
        if field:
            jelly_term = getattr(statement, field)
            decoded_term = self.decode_term(jelly_term)
            self.repeated_terms[oneof] = decoded_term
        else:
            decoded_term = self.repeated_terms[oneof]
            if decoded_term is None:
                msg = f"missing repeated term {oneof}"
                raise ValueError(msg)
        terms.append(decoded_term)
    return terms
options_from_frame(frame, *, delimited)

Fill stream options based on the options row.

Notes: generalized_statements, rdf_star, and namespace declarations are set to false by default

Args: frame (jelly.RdfStreamFrame): first non-empty frame from the stream delimited (bool): derived delimited flag

Returns: ParserOptions: filled options with types/lookups/stream parameters information

Source code in pyjelly/parse/decode.py
def options_from_frame(
    frame: jelly.RdfStreamFrame,
    *,
    delimited: bool,
) -> ParserOptions:
    """
    Fill stream options based on the options row.

    Notes:
        generalized_statements, rdf_star, and namespace declarations
        are set to false by default

    Args:
        frame (jelly.RdfStreamFrame): first non-empty frame from the stream
        delimited (bool): derived delimited flag

    Returns:
        ParserOptions: filled options with types/lookups/stream parameters information

    """
    row = frame.rows[0]
    options = row.options
    return ParserOptions(
        stream_types=StreamTypes(
            physical_type=options.physical_type,
            logical_type=options.logical_type,
        ),
        lookup_preset=LookupPreset(
            max_names=options.max_name_table_size,
            max_prefixes=options.max_prefix_table_size,
            max_datatypes=options.max_datatype_table_size,
        ),
        params=StreamParameters(
            stream_name=options.stream_name,
            version=options.version,
            delimited=delimited,
        ),
    )
ioutils

Functions:

Name Description
delimited_jelly_hint

Detect whether a Jelly file is delimited from its first 3 bytes.

get_options_and_frames

Return stream options and frames from the buffered binary stream.

delimited_jelly_hint(header)

Detect whether a Jelly file is delimited from its first 3 bytes.

Truth table (notation: 0A = 0x0A, NN = not 0x0A, ?? = don't care):

Byte 1 Byte 2 Byte 3 Result
NN ?? ?? Delimited
0A NN ?? Non-delimited
0A 0A NN Delimited (size = 10)
0A 0A 0A Non-delimited (stream options size = 10)

delimited_jelly_hint(bytes([0x00, 0x00, 0x00])) True

delimited_jelly_hint(bytes([0x00, 0x00, 0x0A])) True

delimited_jelly_hint(bytes([0x00, 0x0A, 0x00])) True

delimited_jelly_hint(bytes([0x00, 0x0A, 0x0A])) True

delimited_jelly_hint(bytes([0x0A, 0x00, 0x00])) False

delimited_jelly_hint(bytes([0x0A, 0x00, 0x0A])) False

delimited_jelly_hint(bytes([0x0A, 0x0A, 0x00])) True

delimited_jelly_hint(bytes([0x0A, 0x0A, 0x0A])) False

Source code in pyjelly/parse/ioutils.py
def delimited_jelly_hint(header: bytes) -> bool:
    """
    Detect whether a Jelly file is delimited from its first 3 bytes.

    Truth table (notation: `0A` = `0x0A`, `NN` = `not 0x0A`, `??` = _don't care_):

    | Byte 1 | Byte 2 | Byte 3 | Result                                   |
    |--------|--------|--------|------------------------------------------|
    | `NN`   |  `??`  |  `??`  | Delimited                                |
    | `0A`   |  `NN`  |  `??`  | Non-delimited                            |
    | `0A`   |  `0A`  |  `NN`  | Delimited (size = 10)                    |
    | `0A`   |  `0A`  |  `0A`  | Non-delimited (stream options size = 10) |

    >>> delimited_jelly_hint(bytes([0x00, 0x00, 0x00]))
    True

    >>> delimited_jelly_hint(bytes([0x00, 0x00, 0x0A]))
    True

    >>> delimited_jelly_hint(bytes([0x00, 0x0A, 0x00]))
    True

    >>> delimited_jelly_hint(bytes([0x00, 0x0A, 0x0A]))
    True

    >>> delimited_jelly_hint(bytes([0x0A, 0x00, 0x00]))
    False

    >>> delimited_jelly_hint(bytes([0x0A, 0x00, 0x0A]))
    False

    >>> delimited_jelly_hint(bytes([0x0A, 0x0A, 0x00]))
    True

    >>> delimited_jelly_hint(bytes([0x0A, 0x0A, 0x0A]))
    False
    """
    magic = 0x0A
    return len(header) == 3 and (  # noqa: PLR2004
        header[0] != magic or (header[1] == magic and header[2] != magic)
    )
get_options_and_frames(inp)

Return stream options and frames from the buffered binary stream.

Args: inp (IO[bytes]): jelly buffered binary stream

Raises: JellyConformanceError: if no non-empty frames detected in the delimited stream JellyConformanceError: if non-delimited, error is raised if no rows are detected (empty frame)

Returns: tuple[ParserOptions, Iterator[jelly.RdfStreamFrame]]: ParserOptions holds: stream types, lookup presets and other stream options

Source code in pyjelly/parse/ioutils.py
def get_options_and_frames(
    inp: IO[bytes],
) -> tuple[ParserOptions, Iterator[jelly.RdfStreamFrame]]:
    """
    Return stream options and frames from the buffered binary stream.

    Args:
        inp (IO[bytes]): jelly buffered binary stream

    Raises:
        JellyConformanceError: if no non-empty frames detected in the delimited stream
        JellyConformanceError: if non-delimited,
            error is raised if no rows are detected (empty frame)

    Returns:
        tuple[ParserOptions, Iterator[jelly.RdfStreamFrame]]: ParserOptions holds:
            stream types, lookup presets and other stream options

    """
    is_delimited = delimited_jelly_hint(bytes_read := inp.read(3))
    inp.seek(-len(bytes_read), os.SEEK_CUR)

    if is_delimited:
        frames = frame_iterator(inp)
        first_frame = next(frames, None)
        if first_frame is None:
            msg = "No non-empty frames found in the stream"
            raise JellyConformanceError(msg)

        options = options_from_frame(first_frame, delimited=True)
        return options, chain((first_frame,), frames)

    frame = parse(jelly.RdfStreamFrame, inp.read())

    if not frame.rows:
        msg = "The stream is corrupted (only contains an empty frame)"
        raise JellyConformanceError(msg)

    options = options_from_frame(frame, delimited=False)
    return options, iter((frame,))
lookup

Classes:

Name Description
LookupDecoder

Shared base for RDF lookup encoders using Jelly compression.

LookupDecoder(*, lookup_size)

Shared base for RDF lookup encoders using Jelly compression.

Tracks the last assigned and last reused index.

Parameters:

Name Type Description Default
lookup_size int

Maximum lookup size.

required
Source code in pyjelly/parse/lookup.py
def __init__(self, *, lookup_size: int) -> None:
    if lookup_size > MAX_LOOKUP_SIZE:
        msg = f"lookup size must be less than {MAX_LOOKUP_SIZE}"
        raise JellyAssertionError(msg)
    self.lookup_size = lookup_size
    placeholders = (None,) * lookup_size
    self.data: deque[str | None] = deque(placeholders, maxlen=lookup_size)
    self.last_assigned_index = 0
    self.last_reused_index = 0

serialize

Modules:

Name Description
encode
flows
lookup
streams
encode

Classes:

Name Description
TermEncoder
Slot

Slots for encoding RDF terms.

Functions:

Name Description
split_iri

Split iri into prefix and name.

encode_statement

Encode a statement.

encode_triple

Encode one triple.

encode_quad

Encode one quad.

encode_namespace_declaration

Encode namespace declaration.

encode_options

Encode stream options to ProtoBuf message.

TermEncoder(lookup_preset=None)

Methods:

Name Description
encode_iri

Encode iri.

encode_default_graph

Encode default graph.

encode_bnode

Encode blank node (BN).

encode_literal

Encode literal.

Source code in pyjelly/serialize/encode.py
def __init__(
    self,
    lookup_preset: options.LookupPreset | None = None,
) -> None:
    if lookup_preset is None:
        lookup_preset = options.LookupPreset()
    self.lookup_preset = lookup_preset
    self.names = LookupEncoder(lookup_size=lookup_preset.max_names)
    self.prefixes = LookupEncoder(lookup_size=lookup_preset.max_prefixes)
    self.datatypes = LookupEncoder(lookup_size=lookup_preset.max_datatypes)
encode_iri(iri_string)

Encode iri.

Args: iri_string (str): full iri in string format.

Returns: RowsAnd[jelly.RdfIri]: extra rows and protobuf RdfIri message.

Source code in pyjelly/serialize/encode.py
def encode_iri(self, iri_string: str) -> RowsAnd[jelly.RdfIri]:
    """
    Encode iri.

    Args:
        iri_string (str): full iri in string format.

    Returns:
        RowsAnd[jelly.RdfIri]: extra rows and protobuf RdfIri message.

    """
    prefix, name = split_iri(iri_string)
    if self.prefixes.lookup.max_size:
        prefix_entry_index = self.prefixes.encode_entry_index(prefix)
    else:
        name = iri_string
        prefix_entry_index = None

    name_entry_index = self.names.encode_entry_index(name)
    term_rows = []

    if prefix_entry_index is not None:
        prefix_entry = jelly.RdfPrefixEntry(id=prefix_entry_index, value=prefix)
        term_rows.append(jelly.RdfStreamRow(prefix=prefix_entry))

    if name_entry_index is not None:
        name_entry = jelly.RdfNameEntry(id=name_entry_index, value=name)
        term_rows.append(jelly.RdfStreamRow(name=name_entry))

    prefix_index = self.prefixes.encode_prefix_term_index(prefix)
    name_index = self.names.encode_name_term_index(name)
    return term_rows, jelly.RdfIri(prefix_id=prefix_index, name_id=name_index)
encode_default_graph()

Encode default graph.

Returns: RowsAnd[jelly.RdfDefaultGraph]: empty extra rows and default graph message.

Source code in pyjelly/serialize/encode.py
def encode_default_graph(self) -> RowsAnd[jelly.RdfDefaultGraph]:
    """
    Encode default graph.

    Returns:
        RowsAnd[jelly.RdfDefaultGraph]: empty extra rows and
            default graph message.

    """
    return (), jelly.RdfDefaultGraph()
encode_bnode(bnode)

Encode blank node (BN).

Args: bnode (str): BN internal identifier in string format.

Returns: RowsAnd[str]: empty extra rows and original BN string.

Source code in pyjelly/serialize/encode.py
def encode_bnode(self, bnode: str) -> RowsAnd[str]:
    """
    Encode blank node (BN).

    Args:
        bnode (str): BN internal identifier in string format.

    Returns:
        RowsAnd[str]: empty extra rows and original BN string.

    """
    return (), bnode
encode_literal(*, lex, language=None, datatype=None)

Encode literal.

Args: lex (str): lexical form/literal value language (str | None, optional): langtag. Defaults to None. datatype (str | None, optional): data type if it is a typed literal. Defaults to None.

Raises: JellyConformanceError: if datatype specified while datatable is not used.

Returns: RowsAnd[jelly.RdfLiteral]: extra rows (i.e., datatype entries) and RdfLiteral message.

Source code in pyjelly/serialize/encode.py
def encode_literal(
    self,
    *,
    lex: str,
    language: str | None = None,
    datatype: str | None = None,
) -> RowsAnd[jelly.RdfLiteral]:
    """
    Encode literal.

    Args:
        lex (str): lexical form/literal value
        language (str | None, optional): langtag. Defaults to None.
        datatype (str | None, optional): data type if
        it is a typed literal. Defaults to None.

    Raises:
        JellyConformanceError: if datatype specified while
            datatable is not used.

    Returns:
        RowsAnd[jelly.RdfLiteral]: extra rows (i.e., datatype entries)
            and RdfLiteral message.

    """
    datatype_id = None
    term_rows: tuple[()] | tuple[jelly.RdfStreamRow] = ()

    if datatype and datatype != options.STRING_DATATYPE_IRI:
        if self.datatypes.lookup.max_size == 0:
            msg = (
                f"can't encode literal with type {datatype}: "
                "datatype lookup cannot be used if disabled "
                "(its size was set to 0)"
            )
            raise JellyConformanceError(msg)
        datatype_entry_id = self.datatypes.encode_entry_index(datatype)

        if datatype_entry_id is not None:
            entry = jelly.RdfDatatypeEntry(id=datatype_entry_id, value=datatype)
            term_rows = (jelly.RdfStreamRow(datatype=entry),)

        datatype_id = self.datatypes.encode_datatype_term_index(datatype)

    return term_rows, jelly.RdfLiteral(
        lex=lex,
        langtag=language,
        datatype=datatype_id,
    )
Slot

Bases: str, Enum

Slots for encoding RDF terms.

split_iri(iri_string)

Split iri into prefix and name.

Args: iri_string (str): full iri string.

Returns: tuple[str, str]: iri's prefix and name.

Source code in pyjelly/serialize/encode.py
def split_iri(iri_string: str) -> tuple[str, str]:
    """
    Split iri into prefix and name.

    Args:
        iri_string (str): full iri string.

    Returns:
        tuple[str, str]: iri's prefix and name.

    """
    name = iri_string
    prefix = ""
    for sep in "#", "/":
        prefix, char, name = iri_string.rpartition(sep)
        if char:
            return prefix + char, name
    return prefix, name
encode_statement(terms, term_encoder, repeated_terms)

Encode a statement.

Args: terms (Iterable[object]): original terms to encode term_encoder (TermEncoder): encoder with lookup tables repeated_terms (dict[Slot, object]): dictionary of repeated terms.

Returns: tuple[list[jelly.RdfStreamRow], dict[str, Any]]: extra rows to append and jelly terms.

Source code in pyjelly/serialize/encode.py
def encode_statement(
    terms: Iterable[object],
    term_encoder: TermEncoder,
    repeated_terms: dict[Slot, object],
) -> tuple[list[jelly.RdfStreamRow], dict[str, Any]]:
    """
    Encode a statement.

    Args:
        terms (Iterable[object]): original terms to encode
        term_encoder (TermEncoder): encoder with lookup tables
        repeated_terms (dict[Slot, object]): dictionary of repeated terms.

    Returns:
        tuple[list[jelly.RdfStreamRow], dict[str, Any]]:
            extra rows to append and jelly terms.

    """
    statement: dict[str, object] = {}
    rows: list[jelly.RdfStreamRow] = []
    for slot, term in zip(Slot, terms):
        if repeated_terms[slot] != term:
            extra_rows, value = term_encoder.encode_any(term, slot)
            oneof = term_encoder.TERM_ONEOF_NAMES[type(value)]
            rows.extend(extra_rows)
            field = f"{slot}_{oneof}"
            statement[field] = value
            repeated_terms[slot] = term
    return rows, statement
encode_triple(terms, term_encoder, repeated_terms)

Encode one triple.

Args: terms (Iterable[object]): original terms to encode term_encoder (TermEncoder): current encoder with lookup tables repeated_terms (dict[Slot, object]): dictionary of repeated terms.

Returns: list[jelly.RdfStreamRow]: list of rows to add to the current flow.

Source code in pyjelly/serialize/encode.py
def encode_triple(
    terms: Iterable[object],
    term_encoder: TermEncoder,
    repeated_terms: dict[Slot, object],
) -> list[jelly.RdfStreamRow]:
    """
    Encode one triple.

    Args:
        terms (Iterable[object]): original terms to encode
        term_encoder (TermEncoder): current encoder with lookup tables
        repeated_terms (dict[Slot, object]): dictionary of repeated terms.

    Returns:
        list[jelly.RdfStreamRow]: list of rows to add to the current flow.

    """
    rows, statement = encode_statement(terms, term_encoder, repeated_terms)
    row = jelly.RdfStreamRow(triple=jelly.RdfTriple(**statement))
    rows.append(row)
    return rows
encode_quad(terms, term_encoder, repeated_terms)

Encode one quad.

Args: terms (Iterable[object]): original terms to encode term_encoder (TermEncoder): current encoder with lookup tables repeated_terms (dict[Slot, object]): dictionary of repeated terms.

Returns: list[jelly.RdfStreamRow]: list of messages to append to current flow.

Source code in pyjelly/serialize/encode.py
def encode_quad(
    terms: Iterable[object],
    term_encoder: TermEncoder,
    repeated_terms: dict[Slot, object],
) -> list[jelly.RdfStreamRow]:
    """
    Encode one quad.

    Args:
        terms (Iterable[object]): original terms to encode
        term_encoder (TermEncoder): current encoder with lookup tables
        repeated_terms (dict[Slot, object]): dictionary of repeated terms.

    Returns:
        list[jelly.RdfStreamRow]: list of messages to append to current flow.

    """
    rows, statement = encode_statement(terms, term_encoder, repeated_terms)
    row = jelly.RdfStreamRow(quad=jelly.RdfQuad(**statement))
    rows.append(row)
    return rows
encode_namespace_declaration(name, value, term_encoder)

Encode namespace declaration.

Args: name (str): namespace prefix label value (str): namespace iri term_encoder (TermEncoder): current encoder

Returns: list[jelly.RdfStreamRow]: list of messages to append to current flow.

Source code in pyjelly/serialize/encode.py
def encode_namespace_declaration(
    name: str,
    value: str,
    term_encoder: TermEncoder,
) -> list[jelly.RdfStreamRow]:
    """
    Encode namespace declaration.

    Args:
        name (str): namespace prefix label
        value (str): namespace iri
        term_encoder (TermEncoder): current encoder

    Returns:
        list[jelly.RdfStreamRow]: list of messages to append to current flow.

    """
    [*rows], iri = term_encoder.encode_iri(value)
    declaration = jelly.RdfNamespaceDeclaration(name=name, value=iri)
    row = jelly.RdfStreamRow(namespace=declaration)
    rows.append(row)
    return rows
encode_options(lookup_preset, stream_types, params)

Encode stream options to ProtoBuf message.

Args: lookup_preset (options.LookupPreset): lookup tables options stream_types (options.StreamTypes): physical and logical types params (options.StreamParameters): other params.

Returns: jelly.RdfStreamRow: encoded stream options row

Source code in pyjelly/serialize/encode.py
def encode_options(
    lookup_preset: options.LookupPreset,
    stream_types: options.StreamTypes,
    params: options.StreamParameters,
) -> jelly.RdfStreamRow:
    """
    Encode stream options to ProtoBuf message.

    Args:
        lookup_preset (options.LookupPreset): lookup tables options
        stream_types (options.StreamTypes): physical and logical types
        params (options.StreamParameters): other params.

    Returns:
        jelly.RdfStreamRow: encoded stream options row

    """
    return jelly.RdfStreamRow(
        options=jelly.RdfStreamOptions(
            stream_name=params.stream_name,
            physical_type=stream_types.physical_type,
            generalized_statements=params.generalized_statements,
            rdf_star=params.rdf_star,
            max_name_table_size=lookup_preset.max_names,
            max_prefix_table_size=lookup_preset.max_prefixes,
            max_datatype_table_size=lookup_preset.max_datatypes,
            logical_type=stream_types.logical_type,
            version=params.version,
        )
    )
flows

Classes:

Name Description
FrameFlow

Abstract base class for producing Jelly frames from RDF stream rows.

ManualFrameFlow

Produces frames only when manually requested (never automatically).

BoundedFrameFlow

Produce frames automatically when a fixed number of rows is reached.

GraphsFrameFlow
DatasetsFrameFlow

Functions:

Name Description
flow_for_type

Return flow based on logical type requested.

FrameFlow

Bases: UserList[RdfStreamRow]

Abstract base class for producing Jelly frames from RDF stream rows.

Collects stream rows and assembles them into RdfStreamFrame objects when ready.

Methods:

Name Description
frame_from_graph

Treat the current rows as a graph and produce a frame.

frame_from_dataset

Treat the current rows as a dataset and produce a frame.

to_stream_frame

Create stream frame from flow content.

frame_from_graph()

Treat the current rows as a graph and produce a frame.

Default implementation returns None.

Source code in pyjelly/serialize/flows.py
def frame_from_graph(self) -> jelly.RdfStreamFrame | None:
    """
    Treat the current rows as a graph and produce a frame.

    Default implementation returns None.
    """
    return None
frame_from_dataset()

Treat the current rows as a dataset and produce a frame.

Default implementation returns None.

Source code in pyjelly/serialize/flows.py
def frame_from_dataset(self) -> jelly.RdfStreamFrame | None:
    """
    Treat the current rows as a dataset and produce a frame.

    Default implementation returns None.
    """
    return None
to_stream_frame()

Create stream frame from flow content.

Notes: Clears flow content after creating the frame.

Returns: jelly.RdfStreamFrame | None: stream frame

Source code in pyjelly/serialize/flows.py
def to_stream_frame(self) -> jelly.RdfStreamFrame | None:
    """
    Create stream frame from flow content.

    Notes:
        Clears flow content after creating the frame.

    Returns:
        jelly.RdfStreamFrame | None: stream frame

    """
    if not self:
        return None
    frame = jelly.RdfStreamFrame(rows=self)
    self.clear()
    return frame
ManualFrameFlow(initlist=None, *, logical_type=jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED)

Bases: FrameFlow

Produces frames only when manually requested (never automatically).

Warning

All stream rows are kept in memory until to_stream_frame() is called. This may lead to high memory usage for large streams.

Used for non-delimited serialization.

Source code in pyjelly/serialize/flows.py
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    *,
    logical_type: jelly.LogicalStreamType = jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED,
) -> None:
    super().__init__(initlist)
    self.logical_type = logical_type
BoundedFrameFlow(initlist=None, *, frame_size=None)

Bases: FrameFlow

Produce frames automatically when a fixed number of rows is reached.

Used for delimited encoding (default mode).

Methods:

Name Description
frame_from_bounds

Emit frame from flow if full.

Source code in pyjelly/serialize/flows.py
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    *,
    frame_size: int | None = None,
) -> None:
    super().__init__(initlist)
    self.frame_size = frame_size or DEFAULT_FRAME_SIZE
frame_from_bounds()

Emit frame from flow if full.

Returns: jelly.RdfStreamFrame | None: stream frame

Source code in pyjelly/serialize/flows.py
@override
def frame_from_bounds(self) -> jelly.RdfStreamFrame | None:
    """
    Emit frame from flow if full.

    Returns:
        jelly.RdfStreamFrame | None: stream frame

    """
    if len(self) >= self.frame_size:
        return self.to_stream_frame()
    return None
GraphsFrameFlow

Bases: FrameFlow

Methods:

Name Description
frame_from_graph

Emit current flow content (one graph) as jelly frame.

frame_from_graph()

Emit current flow content (one graph) as jelly frame.

Returns: jelly.RdfStreamFrame | None: jelly frame or none if flow is empty.

Source code in pyjelly/serialize/flows.py
def frame_from_graph(self) -> jelly.RdfStreamFrame | None:
    """
    Emit current flow content (one graph) as jelly frame.

    Returns:
        jelly.RdfStreamFrame | None: jelly frame or none if
            flow is empty.

    """
    return self.to_stream_frame()
DatasetsFrameFlow

Bases: FrameFlow

Methods:

Name Description
frame_from_dataset

Emit current flow content (dataset) as jelly frame.

frame_from_dataset()

Emit current flow content (dataset) as jelly frame.

Returns: jelly.RdfStreamFrame | None: jelly frame or none if flow is empty.

Source code in pyjelly/serialize/flows.py
def frame_from_dataset(self) -> jelly.RdfStreamFrame | None:
    """
    Emit current flow content (dataset) as jelly frame.

    Returns:
        jelly.RdfStreamFrame | None: jelly frame or none if
            flow is empty.

    """
    return self.to_stream_frame()
flow_for_type(logical_type)

Return flow based on logical type requested.

Args: logical_type (jelly.LogicalStreamType): logical type requested.

Raises: NotImplementedError: if logical type not supported.

Returns: type[FrameFlow]: FrameFlow for respective logical type.

Source code in pyjelly/serialize/flows.py
def flow_for_type(logical_type: jelly.LogicalStreamType) -> type[FrameFlow]:
    """
    Return flow based on logical type requested.

    Args:
        logical_type (jelly.LogicalStreamType): logical type requested.

    Raises:
        NotImplementedError: if logical type not supported.

    Returns:
        type[FrameFlow]: FrameFlow for respective logical type.

    """
    try:
        return FLOW_DISPATCH[logical_type]
    except KeyError:
        msg = (
            "unsupported logical stream type: "
            f"{jelly.LogicalStreamType.Name(logical_type)}"
        )
        raise NotImplementedError(msg) from None
lookup

Classes:

Name Description
Lookup

Fixed-size 1-based string-to-index mapping with LRU eviction.

LookupEncoder

Shared base for RDF lookup encoders using Jelly compression.

Lookup(max_size)

Fixed-size 1-based string-to-index mapping with LRU eviction.

  • Assigns incrementing indices starting from 1.
  • After reaching the maximum size, reuses the existing indices from evicting the least-recently-used entries.
  • Index 0 is reserved for delta encoding in Jelly streams.

To check if a key exists, use .move(key) and catch KeyError. If KeyError is raised, the key can be inserted with .insert(key).

Parameters:

Name Type Description Default
max_size int

Maximum number of entries. Zero disables lookup.

required
Source code in pyjelly/serialize/lookup.py
def __init__(self, max_size: int) -> None:
    self.data = OrderedDict[str, int]()
    self.max_size = max_size
    self._evicting = False
LookupEncoder(*, lookup_size)

Shared base for RDF lookup encoders using Jelly compression.

Tracks the last assigned and last reused index.

Parameters:

Name Type Description Default
lookup_size int

Maximum lookup size.

required

Methods:

Name Description
encode_entry_index

Get or assign the index to use in an entry.

Source code in pyjelly/serialize/lookup.py
def __init__(self, *, lookup_size: int) -> None:
    self.lookup = Lookup(max_size=lookup_size)
    self.last_assigned_index = 0
    self.last_reused_index = 0
encode_entry_index(key)

Get or assign the index to use in an entry.

Returns:

Type Description
int or None
  • 0 if the new index is sequential (last_assigned_index + 1)
  • actual assigned/reused index otherwise
  • None if the key already exists
If the return value is None, the entry is already in the lookup and does not
need to be emitted. Any integer value (including 0) means the entry is new
and should be emitted.
Source code in pyjelly/serialize/lookup.py
def encode_entry_index(self, key: str) -> int | None:
    """
    Get or assign the index to use in an entry.

    Returns
    -------
    int or None
        - 0 if the new index is sequential (`last_assigned_index + 1`)
        - actual assigned/reused index otherwise
        - None if the key already exists

    If the return value is None, the entry is already in the lookup and does not
    need to be emitted. Any integer value (including 0) means the entry is new
    and should be emitted.

    """
    try:
        self.lookup.make_last_to_evict(key)
        return None  # noqa: TRY300
    except KeyError:
        previous_index = self.last_assigned_index
        index = self.lookup.insert(key)
        self.last_assigned_index = index
        if index == previous_index + 1:
            return 0
        return index
streams

Classes:

Name Description
Stream
TripleStream
QuadStream
GraphStream

Functions:

Name Description
stream_for_type

Give a Stream based on physical type specified.

Stream(*, encoder, options=None)

Methods:

Name Description
infer_flow

Return flow based on the stream options provided.

enroll

Initialize start of the stream.

stream_options

Encode and append stream options row to the current flow.

namespace_declaration

Add namespace declaration to jelly stream.

for_rdflib

Initialize stream with RDFLib encoder.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = dict.fromkeys(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
infer_flow()

Return flow based on the stream options provided.

Returns: FrameFlow: initialised FrameFlow object.

Source code in pyjelly/serialize/streams.py
def infer_flow(self) -> FrameFlow:
    """
    Return flow based on the stream options provided.

    Returns:
        FrameFlow: initialised FrameFlow object.

    """
    flow: FrameFlow
    if self.options.params.delimited:
        if self.options.logical_type != jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED:
            flow_class = flow_for_type(self.options.logical_type)
        else:
            flow_class = self.default_delimited_flow_class

        if self.options.logical_type in (
            jelly.LOGICAL_STREAM_TYPE_FLAT_TRIPLES,
            jelly.LOGICAL_STREAM_TYPE_FLAT_QUADS,
        ):
            flow = flow_class(frame_size=self.options.frame_size)  # type: ignore[call-overload]
        else:
            flow = flow_class()
    else:
        flow = ManualFrameFlow(logical_type=self.options.logical_type)
    return flow
enroll()

Initialize start of the stream.

Source code in pyjelly/serialize/streams.py
def enroll(self) -> None:
    """Initialize start of the stream."""
    if not self.enrolled:
        self.stream_options()
        self.enrolled = True
stream_options()

Encode and append stream options row to the current flow.

Source code in pyjelly/serialize/streams.py
def stream_options(self) -> None:
    """Encode and append stream options row to the current flow."""
    self.flow.append(
        encode_options(
            stream_types=self.stream_types,
            params=self.options.params,
            lookup_preset=self.options.lookup_preset,
        )
    )
namespace_declaration(name, iri)

Add namespace declaration to jelly stream.

Args: name (str): namespace prefix label iri (str): namespace iri

Source code in pyjelly/serialize/streams.py
def namespace_declaration(self, name: str, iri: str) -> None:
    """
    Add namespace declaration to jelly stream.

    Args:
        name (str): namespace prefix label
        iri (str): namespace iri

    """
    rows = encode_namespace_declaration(
        name=name,
        value=iri,
        term_encoder=self.encoder,
    )
    self.flow.extend(rows)
for_rdflib(options=None)

Initialize stream with RDFLib encoder.

Args: options (SerializerOptions | None, optional): Stream options. Defaults to None.

Raises: TypeError: if Stream is passed, and not a Stream for specific physical type.

Returns: Stream: initialized stream with RDFLib encoder.

Source code in pyjelly/serialize/streams.py
@classmethod
def for_rdflib(cls, options: SerializerOptions | None = None) -> Stream:
    """
    Initialize stream with RDFLib encoder.

    Args:
        options (SerializerOptions | None, optional): Stream options.
            Defaults to None.

    Raises:
        TypeError: if Stream is passed, and not a Stream for specific physical type.

    Returns:
        Stream: initialized stream with RDFLib encoder.

    """
    if cls is Stream:
        msg = "Stream is an abstract base class, use a subclass instead"
        raise TypeError(msg)
    from pyjelly.integrations.rdflib.serialize import RDFLibTermEncoder

    lookup_preset: LookupPreset | None = None
    if options is not None:
        lookup_preset = options.lookup_preset
    return cls(
        encoder=RDFLibTermEncoder(lookup_preset=lookup_preset),
        options=options,
    )
TripleStream(*, encoder, options=None)

Bases: Stream

Methods:

Name Description
triple

Process one triple to Protobuf messages.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = dict.fromkeys(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
triple(terms)

Process one triple to Protobuf messages.

Note: Adds new rows to the current flow and returns StreamFrame if frame size conditions are met.

Args: terms (Iterable[object]): RDF terms to encode.

Returns: jelly.RdfStreamFrame | None: stream frame if flow supports frames slicing and current flow is full

Source code in pyjelly/serialize/streams.py
def triple(self, terms: Iterable[object]) -> jelly.RdfStreamFrame | None:
    """
    Process one triple to Protobuf messages.

    Note:
        Adds new rows to the current flow and returns StreamFrame if
        frame size conditions are met.

    Args:
        terms (Iterable[object]): RDF terms to encode.

    Returns:
        jelly.RdfStreamFrame | None: stream frame if
            flow supports frames slicing and current flow is full

    """
    new_rows = encode_triple(
        terms,
        term_encoder=self.encoder,
        repeated_terms=self.repeated_terms,
    )
    self.flow.extend(new_rows)
    return self.flow.frame_from_bounds()
QuadStream(*, encoder, options=None)

Bases: Stream

Methods:

Name Description
quad

Process one quad to Protobuf messages.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = dict.fromkeys(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
quad(terms)

Process one quad to Protobuf messages.

Args: terms (Iterable[object]): terms to encode.

Returns: jelly.RdfStreamFrame | None: stream frame if flow supports frames slicing and current flow is full

Source code in pyjelly/serialize/streams.py
def quad(self, terms: Iterable[object]) -> jelly.RdfStreamFrame | None:
    """
    Process one quad to Protobuf messages.

    Args:
        terms (Iterable[object]): terms to encode.

    Returns:
        jelly.RdfStreamFrame | None: stream frame if
            flow supports frames slicing and current flow is full

    """
    new_rows = encode_quad(
        terms,
        term_encoder=self.encoder,
        repeated_terms=self.repeated_terms,
    )
    self.flow.extend(new_rows)
    return self.flow.frame_from_bounds()
GraphStream(*, encoder, options=None)

Bases: TripleStream

Methods:

Name Description
graph

Process one graph into a sequence of jelly frames.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = dict.fromkeys(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
graph(graph_id, graph)

Process one graph into a sequence of jelly frames.

Args: graph_id (object): graph id (BN, Literal, iri, default) graph (Iterable[Iterable[object]]): iterable of triples (graph's content)

Yields: Generator[jelly.RdfStreamFrame]: jelly frames.

Source code in pyjelly/serialize/streams.py
def graph(
    self,
    graph_id: object,
    graph: Iterable[Iterable[object]],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Process one graph into a sequence of jelly frames.

    Args:
        graph_id (object): graph id (BN, Literal, iri, default)
        graph (Iterable[Iterable[object]]): iterable of triples (graph's content)

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames.

    """
    [*graph_rows], graph_node = self.encoder.encode_any(graph_id, Slot.graph)
    kw_name = f"{Slot.graph}_{self.encoder.TERM_ONEOF_NAMES[type(graph_node)]}"
    kws: dict[Any, Any] = {kw_name: graph_node}
    start_row = jelly.RdfStreamRow(graph_start=jelly.RdfGraphStart(**kws))
    graph_rows.append(start_row)
    self.flow.extend(graph_rows)
    for triple in graph:
        if frame := self.triple(triple):  # has frame slicing inside
            yield frame
    end_row = jelly.RdfStreamRow(graph_end=jelly.RdfGraphEnd())
    self.flow.append(end_row)
    if frame := self.flow.frame_from_bounds():
        yield frame
stream_for_type(physical_type)

Give a Stream based on physical type specified.

Args: physical_type (jelly.PhysicalStreamType): jelly stream physical type.

Raises: NotImplementedError: if no stream for requested physical type is available.

Returns: type[Stream]: jelly stream

Source code in pyjelly/serialize/streams.py
def stream_for_type(physical_type: jelly.PhysicalStreamType) -> type[Stream]:
    """
    Give a Stream based on physical type specified.

    Args:
        physical_type (jelly.PhysicalStreamType): jelly stream physical type.

    Raises:
        NotImplementedError: if no stream for requested physical type is available.

    Returns:
        type[Stream]: jelly stream

    """
    try:
        stream_cls = STREAM_DISPATCH[physical_type]
    except KeyError:
        msg = (
            "no stream class for physical type "
            f"{jelly.PhysicalStreamType.Name(physical_type)}"
        )
        raise NotImplementedError(msg) from None
    return stream_cls