Skip to content

API reference

pyjelly

Modules:

Name Description
errors
integrations
jelly
options
parse
serialize

errors

Classes:

Name Description
JellyConformanceError

Raised when Jelly conformance is violated.

JellyAssertionError

Raised when a recommended assertion from the specification fails.

JellyNotImplementedError

Raised when a future feature is not yet implemented.

JellyConformanceError

Bases: Exception

Raised when Jelly conformance is violated.

JellyAssertionError

Bases: AssertionError

Raised when a recommended assertion from the specification fails.

JellyNotImplementedError

Bases: NotImplementedError

Raised when a future feature is not yet implemented.

integrations

Modules:

Name Description
generic
rdflib
generic

Modules:

Name Description
generic_sink
parse
serialize
generic_sink

Classes:

Name Description
BlankNode

Class for blank nodes, storing BN's identifier as a string.

IRI

Class for IRIs, storing IRI as a string.

Literal

Class for literals.

Triple

Class for RDF triples.

Quad

Class for RDF quads.

Prefix

Class for generic namespace declaration.

GenericStatementSink
BlankNode(identifier)

Class for blank nodes, storing BN's identifier as a string.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(self, identifier: str) -> None:
    self._identifier: str = identifier
IRI(iri)

Class for IRIs, storing IRI as a string.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(self, iri: str) -> None:
    self._iri: str = iri
Literal(lex, langtag=None, datatype=None)

Class for literals.

Notes: Consists of: lexical form, and optional language tag and datatype. All parts of literal are stored as strings.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(
    self, lex: str, langtag: str | None = None, datatype: str | None = None
) -> None:
    self._lex: str = lex
    self._langtag: str | None = langtag
    self._datatype: str | None = datatype
Triple

Bases: NamedTuple

Class for RDF triples.

Quad

Bases: NamedTuple

Class for RDF quads.

Prefix

Bases: NamedTuple

Class for generic namespace declaration.

GenericStatementSink(identifier=DefaultGraph)

Notes: _store preserves the order of statements.

Args: identifier (str, optional): Identifier for a sink. Defaults to DefaultGraph.

Attributes:

Name Type Description
is_triples_sink bool

Check if the sink contains triples or quads.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(self, identifier: GraphName = DefaultGraph) -> None:
    """
    Initialize statements storage, namespaces dictionary, and parser.

    Notes:
        _store preserves the order of statements.

    Args:
        identifier (str, optional): Identifier for a sink.
            Defaults to DefaultGraph.

    """
    self._store: deque[Triple | Quad] = deque()
    self._namespaces: dict[str, IRI] = {}
    self._identifier = identifier
is_triples_sink

Check if the sink contains triples or quads.

Returns: bool: true, if length of statement is 3.

parse

Classes:

Name Description
GenericStatementSinkAdapter

Implement Adapter for generic statements.

GenericTriplesAdapter

Triples adapted implementation for GenericStatementSink.

GenericQuadsAdapter

Extends GenericQuadsBaseAdapter for QUADS physical type.

GenericGraphsAdapter

Extends GenericQuadsBaseAdapter for GRAPHS physical type.

Functions:

Name Description
parse_triples_stream

Parse flat triple stream.

parse_quads_stream

Parse flat quads stream.

parse_jelly_grouped

Take a jelly file and return generators of generic statements sinks.

parse_jelly_to_graph

Add statements from Generator to GenericStatementSink.

parse_jelly_flat

Parse jelly file with FLAT logical type into a Generator of stream events.

GenericStatementSinkAdapter(options, parsing_mode=ParsingMode.FLAT)

Bases: Adapter

Implement Adapter for generic statements.

Notes: Returns custom RDF terms expected by GenericStatementSink, handles namespace declarations, and quoted triples.

Args: Adapter (type): base Adapter class

Source code in pyjelly/parse/decode.py
def __init__(
    self, options: ParserOptions, parsing_mode: ParsingMode = ParsingMode.FLAT
) -> None:
    self.options = options
    self.parsing_mode = parsing_mode
GenericTriplesAdapter(options)

Bases: GenericStatementSinkAdapter

Triples adapted implementation for GenericStatementSink.

Args: GenericStatementSinkAdapter (type): base GenericStatementSink adapter implementation that handles terms and namespaces.

Source code in pyjelly/integrations/generic/parse.py
def __init__(
    self,
    options: ParserOptions,
) -> None:
    super().__init__(options=options)
GenericQuadsAdapter(options)

Bases: GenericQuadsBaseAdapter

Extends GenericQuadsBaseAdapter for QUADS physical type.

Args: GenericQuadsBaseAdapter (type): quads adapter that handles base quads processing.

Source code in pyjelly/integrations/generic/parse.py
def __init__(self, options: ParserOptions) -> None:
    super().__init__(options=options)
GenericGraphsAdapter(options)

Bases: GenericQuadsBaseAdapter

Extends GenericQuadsBaseAdapter for GRAPHS physical type.

Notes: introduces graph start/end, checks if graph exists.

Args: GenericQuadsBaseAdapter (type): quads adapter that handles base quads processing.

Raises: JellyConformanceError: raised if graph start message was not received.

Source code in pyjelly/integrations/generic/parse.py
def __init__(
    self,
    options: ParserOptions,
) -> None:
    super().__init__(options=options)
    self._graph_id = None
parse_triples_stream(frames, options)

Parse flat triple stream.

Args: frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames options (ParserOptions): stream options

Yields: Generator[Iterable[Triple | Prefix]]: Generator of iterables of Triple or Prefix objects, one iterable per frame.

Source code in pyjelly/integrations/generic/parse.py
def parse_triples_stream(
    frames: Iterable[jelly.RdfStreamFrame],
    options: ParserOptions,
) -> Generator[Iterable[Triple | Prefix]]:
    """
    Parse flat triple stream.

    Args:
        frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames
        options (ParserOptions): stream options

    Yields:
        Generator[Iterable[Triple | Prefix]]:
            Generator of iterables of Triple or Prefix objects,
            one iterable per frame.

    """
    adapter = GenericTriplesAdapter(options)
    decoder = Decoder(adapter=adapter)
    for frame in frames:
        yield decoder.iter_rows(frame)
    return
parse_quads_stream(frames, options)

Parse flat quads stream.

Args: frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames options (ParserOptions): stream options

Yields: Generator[Iterable[Quad | Prefix]]: Generator of iterables of Quad or Prefix objects, one iterable per frame.

Source code in pyjelly/integrations/generic/parse.py
def parse_quads_stream(
    frames: Iterable[jelly.RdfStreamFrame],
    options: ParserOptions,
) -> Generator[Iterable[Quad | Prefix]]:
    """
    Parse flat quads stream.

    Args:
        frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames
        options (ParserOptions): stream options

    Yields:
        Generator[Iterable[Quad | Prefix]]:
            Generator of iterables of Quad or Prefix objects,
            one iterable per frame.

    """
    adapter_class: type[GenericQuadsBaseAdapter]
    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_QUADS:
        adapter_class = GenericQuadsAdapter
    else:
        adapter_class = GenericGraphsAdapter
    adapter = adapter_class(options=options)
    decoder = Decoder(adapter=adapter)
    for frame in frames:
        yield decoder.iter_rows(frame)
    return
parse_jelly_grouped(inp, sink_factory=lambda: GenericStatementSink(), *, logical_type_strict=False)

Take a jelly file and return generators of generic statements sinks.

Yields one generic statements sink per frame.

Args: inp (IO[bytes]): input jelly buffered binary stream sink_factory (Callable): lambda to construct a statement sink. By default, creates an empty in-memory GenericStatementSink. logical_type_strict (bool): If True, validate the logical type in stream options and require a grouped logical type. Otherwise, only the physical type is used to route parsing.

Raises: NotImplementedError: is raised if a physical type is not implemented

Yields: Generator[GenericStatementSink]: returns generators for GenericStatementSink, regardless of stream type.

Source code in pyjelly/integrations/generic/parse.py
def parse_jelly_grouped(
    inp: IO[bytes],
    sink_factory: Callable[[], GenericStatementSink] = lambda: GenericStatementSink(),
    *,
    logical_type_strict: bool = False,
) -> Generator[GenericStatementSink]:
    """
    Take a jelly file and return generators of generic statements sinks.

    Yields one generic statements sink per frame.

    Args:
        inp (IO[bytes]): input jelly buffered binary stream
        sink_factory (Callable): lambda to construct a statement sink.
            By default, creates an empty in-memory GenericStatementSink.
        logical_type_strict (bool): If True, validate the *logical* type
            in stream options and require a grouped logical type.
            Otherwise, only the physical type is used to route parsing.

    Raises:
        NotImplementedError: is raised if a physical type is not implemented

    Yields:
        Generator[GenericStatementSink]:
            returns generators for GenericStatementSink, regardless of stream type.

    """
    options, frames = get_options_and_frames(inp)

    st = getattr(options, "stream_types", None)
    if logical_type_strict and (
        st is None
        or st.logical_type == jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED
        or st.flat
    ):
        lt_name = (
            "UNSPECIFIED"
            if st is None
            else jelly.LogicalStreamType.Name(st.logical_type)
        )

        msg = (
            "strict logical type check requires options.stream_types"
            if st is None
            else f"expected GROUPED logical type, got {lt_name}"
        )
        raise JellyConformanceError(msg)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        for graph in parse_triples_stream(
            frames=frames,
            options=options,
        ):
            sink = sink_factory()
            for graph_item in graph:
                if isinstance(graph_item, Prefix):
                    sink.bind(graph_item.prefix, graph_item.iri)
                else:
                    sink.add(graph_item)
            yield sink
        return
    elif options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        for dataset in parse_quads_stream(
            frames=frames,
            options=options,
        ):
            sink = sink_factory()
            for item in dataset:
                if isinstance(item, Prefix):
                    sink.bind(item.prefix, item.iri)
                else:
                    sink.add(item)
            yield sink
        return

    physical_type_name = jelly.PhysicalStreamType.Name(
        options.stream_types.physical_type
    )
    msg = f"the stream type {physical_type_name} is not supported "
    raise NotImplementedError(msg)
parse_jelly_to_graph(inp, sink_factory=lambda: GenericStatementSink())

Add statements from Generator to GenericStatementSink.

Args: inp (IO[bytes]): input jelly stream. sink_factory (Callable[[], GenericStatementSink]): factory to create statement sink. By default creates an empty in-memory GenericStatementSink. Has no division for datasets/graphs, utilizes the same underlying data structures.

Returns: GenericStatementSink: GenericStatementSink with statements.

Source code in pyjelly/integrations/generic/parse.py
def parse_jelly_to_graph(
    inp: IO[bytes],
    sink_factory: Callable[[], GenericStatementSink] = lambda: GenericStatementSink(),
) -> GenericStatementSink:
    """
    Add statements from Generator to GenericStatementSink.

    Args:
        inp (IO[bytes]): input jelly stream.
        sink_factory (Callable[[], GenericStatementSink]): factory to create
            statement sink.
            By default creates an empty in-memory GenericStatementSink.
            Has no division for datasets/graphs,
            utilizes the same underlying data structures.

    Returns:
        GenericStatementSink: GenericStatementSink with statements.

    """
    options, frames = get_options_and_frames(inp)
    sink = sink_factory()

    for item in parse_jelly_flat(inp=inp, frames=frames, options=options):
        if isinstance(item, Prefix):
            sink.bind(item.prefix, item.iri)
        else:
            sink.add(item)
    return sink
parse_jelly_flat(inp, frames=None, options=None, *, logical_type_strict=False)

Parse jelly file with FLAT logical type into a Generator of stream events.

Args: inp (IO[bytes]): input jelly buffered binary stream. frames (Iterable[jelly.RdfStreamFrame | None): jelly frames if read before. options (ParserOptions | None): stream options if read before. logical_type_strict (bool): If True, validate the logical type in stream options and require FLAT (TRIPLES/QUADS). Otherwise, only the physical type is used to route parsing.

Raises: NotImplementedError: if physical type is not supported

Yields: Generator[Statement | Prefix]: Generator of stream events

Source code in pyjelly/integrations/generic/parse.py
def parse_jelly_flat(
    inp: IO[bytes],
    frames: Iterable[jelly.RdfStreamFrame] | None = None,
    options: ParserOptions | None = None,
    *,
    logical_type_strict: bool = False,
) -> Generator[Statement | Prefix]:
    """
    Parse jelly file with FLAT logical type into a Generator of stream events.

    Args:
        inp (IO[bytes]): input jelly buffered binary stream.
        frames (Iterable[jelly.RdfStreamFrame | None):
            jelly frames if read before.
        options (ParserOptions | None): stream options
            if read before.
        logical_type_strict (bool): If True, validate the *logical* type
            in stream options and require FLAT (TRIPLES/QUADS).
            Otherwise, only the physical type is used to route parsing.

    Raises:
        NotImplementedError: if physical type is not supported

    Yields:
        Generator[Statement | Prefix]: Generator of stream events

    """
    if frames is None or options is None:
        options, frames = get_options_and_frames(inp)

    st = getattr(options, "stream_types", None)
    if logical_type_strict and (st is None or not st.flat):
        lt_name = (
            "UNSPECIFIED"
            if st is None
            else jelly.LogicalStreamType.Name(st.logical_type)
        )

        msg = (
            "strict logical type check requires options.stream_types"
            if st is None
            else f"expected FLAT logical type (TRIPLES/QUADS), got {lt_name}"
        )
        raise JellyConformanceError(msg)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        for triples in parse_triples_stream(frames=frames, options=options):
            yield from triples
        return
    if options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        for quads in parse_quads_stream(
            frames=frames,
            options=options,
        ):
            yield from quads
        return
    physical_type_name = jelly.PhysicalStreamType.Name(
        options.stream_types.physical_type
    )
    msg = f"the stream type {physical_type_name} is not supported "
    raise NotImplementedError(msg)
serialize

Classes:

Name Description
GenericSinkTermEncoder

Functions:

Name Description
triples_stream_frames

Serialize a GenericStatementSink into frames using physical type triples stream.

quads_stream_frames

Serialize a GenericStatementSink into jelly frames using physical type quads stream.

graphs_stream_frames

Serialize a GenericStatementSink into jelly frames as a stream of graphs.

split_to_graphs

Split a generator of quads to graphs.

guess_options

Guess the serializer options based on the store type.

guess_stream

Return an appropriate stream implementation for the given options.

grouped_stream_to_frames

Transform multiple GenericStatementSinks into Jelly frames.

grouped_stream_to_file

Write stream of GenericStatementSink to a binary file.

flat_stream_to_frames

Serialize a stream of raw GenericStatementSink's triples or quads into Jelly frames.

flat_stream_to_file

Write Triple or Quad events to a binary file.

GenericSinkTermEncoder(lookup_preset=None)

Bases: TermEncoder

Methods:

Name Description
encode_spo

Encode term based on its GenericSink object.

encode_graph

Encode graph term based on its GenericSink object.

Source code in pyjelly/serialize/encode.py
def __init__(
    self,
    lookup_preset: options.LookupPreset | None = None,
) -> None:
    if lookup_preset is None:
        lookup_preset = options.LookupPreset()
    self.lookup_preset = lookup_preset
    self.names = LookupEncoder(lookup_size=lookup_preset.max_names)
    self.prefixes = LookupEncoder(lookup_size=lookup_preset.max_prefixes)
    self.datatypes = LookupEncoder(lookup_size=lookup_preset.max_datatypes)
encode_spo(term, slot, statement)

Encode term based on its GenericSink object.

Args: term (object): term to encode slot (Slot): its place in statement. statement (Statement): Triple/Quad/GraphStart message to fill with terms.

Returns: Rows: encoded extra rows

Source code in pyjelly/integrations/generic/serialize.py
def encode_spo(self, term: object, slot: Slot, statement: Statement) -> Rows:
    """
    Encode term based on its GenericSink object.

    Args:
        term (object): term to encode
        slot (Slot): its place in statement.
        statement (Statement): Triple/Quad/GraphStart message to fill with terms.

    Returns:
        Rows: encoded extra rows

    """
    if isinstance(term, IRI):
        iri = self.get_iri_field(statement, slot)
        return self.encode_iri(term._iri, iri)

    if isinstance(term, Literal):
        literal = self.get_literal_field(statement, slot)
        return self.encode_literal(
            lex=term._lex,
            language=term._langtag,
            datatype=term._datatype,
            literal=literal,
        )

    if isinstance(term, BlankNode):
        self.set_bnode_field(
            statement,
            slot,
            term._identifier,
        )
        return ()

    if isinstance(term, Triple):
        quoted_statement = self.get_triple_field(statement, slot)
        return self.encode_quoted_triple(term, quoted_statement)

    return super().encode_spo(term, slot, statement)  # error if not handled
encode_graph(term, statement)

Encode graph term based on its GenericSink object.

Args: term (object): term to encode statement (HasGraph): Quad/GraphStart message to fill g_{} in.

Returns: Rows: encoded extra rows

Source code in pyjelly/integrations/generic/serialize.py
def encode_graph(self, term: object, statement: HasGraph) -> Rows:
    """
    Encode graph term based on its GenericSink object.

    Args:
        term (object): term to encode
        statement (HasGraph): Quad/GraphStart message to fill g_{} in.

    Returns:
        Rows: encoded extra rows

    """
    if term == DefaultGraph:
        return self.encode_default_graph(statement.g_default_graph)
    if isinstance(term, IRI):
        return self.encode_iri(term._iri, statement.g_iri)

    if isinstance(term, Literal):
        return self.encode_literal(
            lex=term._lex,
            language=term._langtag,
            datatype=term._datatype,
            literal=statement.g_literal,
        )

    if isinstance(term, BlankNode):
        statement.g_bnode = term._identifier
        return ()
    return super().encode_graph(term, statement)  # error if not handled
triples_stream_frames(stream, data)

Serialize a GenericStatementSink into frames using physical type triples stream.

Args: stream (TripleStream): stream that specifies triples processing data (GenericStatementSink | Generator[Triple]): GenericStatementSink/Statements to serialize.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames.

Source code in pyjelly/integrations/generic/serialize.py
@stream_frames.register(TripleStream)
def triples_stream_frames(
    stream: TripleStream,
    data: GenericStatementSink | Generator[Triple],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a GenericStatementSink into frames using physical type triples stream.

    Args:
        stream (TripleStream): stream that specifies triples processing
        data (GenericStatementSink | Generator[Triple]):
            GenericStatementSink/Statements to serialize.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames.

    """
    stream.enroll()
    if (
        isinstance(data, GenericStatementSink)
        and stream.options.params.namespace_declarations
    ):
        namespace_declarations(data, stream)

    graphs = (data,)
    for graph in graphs:
        for terms in graph:
            if frame := stream.triple(terms):
                yield frame
        if frame := stream.flow.frame_from_graph():
            yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame
quads_stream_frames(stream, data)

Serialize a GenericStatementSink into jelly frames using physical type quads stream.

Args: stream (QuadStream): stream that specifies quads processing data (GenericStatementSink | Generator[Quad]): Dataset to serialize.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames

Source code in pyjelly/integrations/generic/serialize.py
@stream_frames.register(QuadStream)
def quads_stream_frames(
    stream: QuadStream,
    data: GenericStatementSink | Generator[Quad],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a GenericStatementSink into jelly frames using physical type quads stream.

    Args:
        stream (QuadStream): stream that specifies quads processing
        data (GenericStatementSink | Generator[Quad]): Dataset to serialize.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames

    """
    stream.enroll()
    if stream.options.params.namespace_declarations:
        namespace_declarations(data, stream)  # type: ignore[arg-type]

    iterator: Generator[Quad]
    if isinstance(data, GenericStatementSink):
        iterator = cast(Generator[Quad], data.store)
    else:
        iterator = data

    for terms in iterator:
        if frame := stream.quad(terms):
            yield frame
    if frame := stream.flow.frame_from_dataset():
        yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame
graphs_stream_frames(stream, data)

Serialize a GenericStatementSink into jelly frames as a stream of graphs.

Notes: If flow of DatasetsFrameFlow type, the whole dataset will be encoded into one frame. Graphs are generated from the GenericStatementSink by iterating over statements and yielding one new GenericStatementSink per a sequence of quads with the same g term.

Args: stream (GraphStream): stream that specifies graphs processing data (GenericStatementSink | Generator[Quad]): Dataset to serialize.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames

Source code in pyjelly/integrations/generic/serialize.py
@stream_frames.register(GraphStream)
def graphs_stream_frames(
    stream: GraphStream,
    data: GenericStatementSink | Generator[Quad],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a GenericStatementSink into jelly frames as a stream of graphs.

    Notes:
        If flow of DatasetsFrameFlow type, the whole dataset
        will be encoded into one frame.
        Graphs are generated from the GenericStatementSink by
        iterating over statements and yielding one new GenericStatementSink
        per a sequence of quads with the same g term.

    Args:
        stream (GraphStream): stream that specifies graphs processing
        data (GenericStatementSink | Generator[Quad]): Dataset to serialize.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames

    """
    stream.enroll()
    if stream.options.params.namespace_declarations:
        namespace_declarations(data, stream)  # type: ignore[arg-type]

    statements: Generator[Quad]
    if isinstance(data, GenericStatementSink):
        statements = cast(Generator[Quad], data.store)
        graphs = split_to_graphs(statements)
    elif iter(data):
        statements = data
        graphs = split_to_graphs(statements)

    for graph in graphs:
        yield from stream.graph(graph_id=graph.identifier, graph=graph)

    if frame := stream.flow.frame_from_dataset():
        yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame
split_to_graphs(data)

Split a generator of quads to graphs.

Notes: New graph is generated by iterating over statements and yielding one new GenericStatementSink per a sequence of quads with the same g term.

Args: data (Generator[Quad]): generator of quads

Yields: Generator[GenericStatementSink]: generator of GenericStatementSinks, each having triples in store and identifier set.

Source code in pyjelly/integrations/generic/serialize.py
def split_to_graphs(data: Generator[Quad]) -> Generator[GenericStatementSink]:
    """
    Split a generator of quads to graphs.

    Notes:
        New graph is generated by
        iterating over statements and yielding one new GenericStatementSink
        per a sequence of quads with the same g term.

    Args:
        data (Generator[Quad]): generator of quads

    Yields:
        Generator[GenericStatementSink]: generator of GenericStatementSinks,
        each having triples in store and identifier set.

    """
    current_g: GraphName | None = None
    current_sink: GenericStatementSink | None = None
    for statement in data:
        if current_g != statement.g:
            if current_sink is not None:
                yield current_sink

            current_g = statement.g
            current_sink = GenericStatementSink(identifier=current_g)

        assert current_sink is not None
        current_sink.add(Triple(statement.s, statement.p, statement.o))

    if current_sink is not None:
        yield current_sink
guess_options(sink)

Guess the serializer options based on the store type.

Source code in pyjelly/integrations/generic/serialize.py
def guess_options(sink: GenericStatementSink) -> SerializerOptions:
    """Guess the serializer options based on the store type."""
    logical_type = (
        jelly.LOGICAL_STREAM_TYPE_FLAT_TRIPLES
        if sink.is_triples_sink
        else jelly.LOGICAL_STREAM_TYPE_FLAT_QUADS
    )
    # Generic sink supports both RDF-star and generalized statements by default
    # as it can handle any term types including quoted triples and generalized RDF terms
    params = StreamParameters(generalized_statements=True, rdf_star=True)
    return SerializerOptions(logical_type=logical_type, params=params)
guess_stream(options, sink)

Return an appropriate stream implementation for the given options.

Notes: if base(!) logical type is GRAPHS and sink.is_triples_sink is false, initializes TripleStream

Source code in pyjelly/integrations/generic/serialize.py
def guess_stream(options: SerializerOptions, sink: GenericStatementSink) -> Stream:
    """
    Return an appropriate stream implementation for the given options.

    Notes: if base(!) logical type is GRAPHS and sink.is_triples_sink is false,
        initializes TripleStream
    """
    stream_cls: type[Stream]
    if (
        options.logical_type % 10
    ) != jelly.LOGICAL_STREAM_TYPE_GRAPHS and not sink.is_triples_sink:
        stream_cls = QuadStream
    else:
        stream_cls = TripleStream
    if options is not None:
        lookup_preset = options.lookup_preset
    return stream_cls(
        encoder=GenericSinkTermEncoder(lookup_preset=lookup_preset),
        options=options,
    )
grouped_stream_to_frames(sink_generator, options=None)

Transform multiple GenericStatementSinks into Jelly frames.

Notes: One frame per GenericStatementSink.

Note: options are guessed if not provided.

Args: sink_generator (Generator[GenericStatementSink]): Generator of GenericStatementSink to transform. options (SerializerOptions | None, optional): stream options to use. Options are guessed based on the sink store type. Defaults to None.

Yields: Generator[jelly.RdfStreamFrame]: produced Jelly frames

Source code in pyjelly/integrations/generic/serialize.py
def grouped_stream_to_frames(
    sink_generator: Generator[GenericStatementSink],
    options: SerializerOptions | None = None,
) -> Generator[jelly.RdfStreamFrame]:
    """
    Transform multiple GenericStatementSinks into Jelly frames.

    Notes:
        One frame per GenericStatementSink.

    Note: options are guessed if not provided.

    Args:
        sink_generator (Generator[GenericStatementSink]): Generator of
            GenericStatementSink to transform.
        options (SerializerOptions | None, optional): stream options to use.
            Options are guessed based on the sink store type. Defaults to None.

    Yields:
        Generator[jelly.RdfStreamFrame]: produced Jelly frames

    """
    stream = None
    for sink in sink_generator:
        if not stream:
            if options is None:
                options = guess_options(sink)
            stream = guess_stream(options, sink)
        yield from stream_frames(stream, sink)
grouped_stream_to_file(stream, output_file, **kwargs)

Write stream of GenericStatementSink to a binary file.

Args: stream (Generator[GenericStatementSink]): Generator of GenericStatementSink to serialize. output_file (IO[bytes]): output buffered writer. **kwargs (Any): options to pass to stream.

Source code in pyjelly/integrations/generic/serialize.py
def grouped_stream_to_file(
    stream: Generator[GenericStatementSink],
    output_file: IO[bytes],
    **kwargs: Any,
) -> None:
    """
    Write stream of GenericStatementSink to a binary file.

    Args:
        stream (Generator[GenericStatementSink]): Generator of
            GenericStatementSink to serialize.
        output_file (IO[bytes]): output buffered writer.
        **kwargs (Any): options to pass to stream.

    """
    for frame in grouped_stream_to_frames(stream, **kwargs):
        write_delimited(frame, output_file)
flat_stream_to_frames(statements, options=None)

Serialize a stream of raw GenericStatementSink's triples or quads into Jelly frames.

Args: statements (Generator[Triple | Quad]): s/p/o triples or s/p/o/g quads to serialize. options (SerializerOptions | None, optional): if omitted, guessed based on the first tuple.

Yields: Generator[jelly.RdfStreamFrame]: generated frames.

Source code in pyjelly/integrations/generic/serialize.py
def flat_stream_to_frames(
    statements: Generator[Triple | Quad],
    options: SerializerOptions | None = None,
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a stream of raw GenericStatementSink's triples or quads into Jelly frames.

    Args:
        statements (Generator[Triple | Quad]):
          s/p/o triples or s/p/o/g quads to serialize.
        options (SerializerOptions | None, optional):
            if omitted, guessed based on the first tuple.

    Yields:
        Generator[jelly.RdfStreamFrame]: generated frames.

    """
    first = next(statements, None)
    if first is None:
        return

    sink = GenericStatementSink()
    sink.add(first)
    if options is None:
        options = guess_options(sink)
    stream = guess_stream(options, sink)

    combined: Generator[Triple | Quad] | GenericStatementSink = (
        item for item in chain([first], statements)
    )

    yield from stream_frames(stream, combined)
flat_stream_to_file(statements, output_file, options=None)

Write Triple or Quad events to a binary file.

Args: statements (Generator[Triple | Quad]): statements to serialize. output_file (IO[bytes]): output buffered writer. options (SerializerOptions | None, optional): stream options.

Source code in pyjelly/integrations/generic/serialize.py
def flat_stream_to_file(
    statements: Generator[Triple | Quad],
    output_file: IO[bytes],
    options: SerializerOptions | None = None,
) -> None:
    """
    Write Triple or Quad events to a binary file.

    Args:
        statements (Generator[Triple | Quad]): statements to serialize.
        output_file (IO[bytes]): output buffered writer.
        options (SerializerOptions | None, optional): stream options.

    """
    for frame in flat_stream_to_frames(statements, options):
        write_delimited(frame, output_file)
rdflib

Modules:

Name Description
parse
serialize

Functions:

Name Description
register_extension_to_rdflib

Make rdflib.util.guess_format discover Jelly format.

register_extension_to_rdflib(extension='.jelly')

Make rdflib.util.guess_format discover Jelly format.

rdflib.util.guess_format("foo.jelly") register_extension_to_rdflib() rdflib.util.guess_format("foo.jelly") 'jelly'

Source code in pyjelly/integrations/rdflib/__init__.py
def register_extension_to_rdflib(extension: str = ".jelly") -> None:
    """
    Make [rdflib.util.guess_format][] discover Jelly format.

    >>> rdflib.util.guess_format("foo.jelly")
    >>> register_extension_to_rdflib()
    >>> rdflib.util.guess_format("foo.jelly")
    'jelly'
    """
    rdflib.util.SUFFIX_FORMAT_MAP[extension.removeprefix(".")] = "jelly"
parse

Classes:

Name Description
Triple

Describe RDFLib triple.

Quad

Describe RDFLib quad.

Prefix

Describe RDF Prefix(i.e, namespace declaration).

RDFLibAdapter

RDFLib adapter class, is extended by triples and quads implementations.

RDFLibTriplesAdapter

Triples adapter RDFLib implementation.

RDFLibQuadsAdapter

Extended RDFLib adapter for the QUADS physical type.

RDFLibGraphsAdapter

Extension of RDFLibQuadsBaseAdapter for the GRAPHS physical type.

RDFLibJellyParser

Functions:

Name Description
parse_triples_stream

Parse flat triple stream.

parse_quads_stream

Parse flat quads stream.

parse_jelly_grouped

Take jelly file and return generators based on the detected physical type.

parse_jelly_to_graph

Add statements from Generator to provided Graph/Dataset.

parse_jelly_flat

Parse jelly file with FLAT logical type into a Generator of stream events.

Triple

Bases: tuple[Node, Node, Node]

Describe RDFLib triple.

Args: tuple (Node, Node, Node): s/p/o tuple of RDFLib Nodes.

Returns: Triple: triple as tuple.

Quad

Bases: tuple[Node, Node, Node, GraphName]

Describe RDFLib quad.

Args: tuple (Node, Node, Node, GraphName): s/p/o/g as a tuple of RDFLib nodes and a GraphName,

Returns: Quad: quad as tuple.

Prefix

Bases: tuple[str, URIRef]

Describe RDF Prefix(i.e, namespace declaration).

Args: tuple (str, rdflib.URIRef): expects prefix as a string, and full namespace URI as Rdflib.URIRef.

Returns: Prefix: prefix as tuple(prefix, iri).

RDFLibAdapter(options, parsing_mode=ParsingMode.FLAT)

Bases: Adapter

RDFLib adapter class, is extended by triples and quads implementations.

Args: Adapter (): abstract adapter class

Source code in pyjelly/parse/decode.py
def __init__(
    self, options: ParserOptions, parsing_mode: ParsingMode = ParsingMode.FLAT
) -> None:
    self.options = options
    self.parsing_mode = parsing_mode
RDFLibTriplesAdapter(options)

Bases: RDFLibAdapter

Triples adapter RDFLib implementation.

Notes: returns triple/namespace declaration as soon as receives them.

Source code in pyjelly/integrations/rdflib/parse.py
def __init__(
    self,
    options: ParserOptions,
) -> None:
    super().__init__(options=options)
RDFLibQuadsAdapter(options)

Bases: RDFLibQuadsBaseAdapter

Extended RDFLib adapter for the QUADS physical type.

Args: RDFLibQuadsBaseAdapter (RDFLibAdapter): base quads adapter (shared with graphs physical type)

Source code in pyjelly/integrations/rdflib/parse.py
def __init__(self, options: ParserOptions) -> None:
    super().__init__(options=options)
RDFLibGraphsAdapter(options)

Bases: RDFLibQuadsBaseAdapter

Extension of RDFLibQuadsBaseAdapter for the GRAPHS physical type.

Notes: introduces graph start/end, checks if graph exists.

Args: RDFLibQuadsBaseAdapter (RDFLibAdapter): base adapter for quads management.

Raises: JellyConformanceError: if no graph_start was encountered

Source code in pyjelly/integrations/rdflib/parse.py
def __init__(
    self,
    options: ParserOptions,
) -> None:
    super().__init__(options=options)
    self._graph_id = None
RDFLibJellyParser

Bases: Parser

Methods:

Name Description
parse

Parse jelly file into provided RDFLib Graph.

parse(source, sink)

Parse jelly file into provided RDFLib Graph.

Args: source (InputSource): jelly file as buffered binary stream InputSource obj sink (Graph): RDFLib Graph

Raises: TypeError: raises error if invalid input

Source code in pyjelly/integrations/rdflib/parse.py
def parse(self, source: InputSource, sink: Graph) -> None:
    """
    Parse jelly file into provided RDFLib Graph.

    Args:
        source (InputSource): jelly file as buffered binary stream InputSource obj
        sink (Graph): RDFLib Graph

    Raises:
        TypeError: raises error if invalid input

    """
    inp = source.getByteStream()  # type: ignore[no-untyped-call]
    if inp is None:
        msg = "expected source to be a stream of bytes"
        raise TypeError(msg)
    parse_jelly_to_graph(
        inp,
        graph_factory=lambda: Graph(store=sink.store, identifier=sink.identifier),
        dataset_factory=lambda: Dataset(store=sink.store),
    )
parse_triples_stream(frames, options)

Parse flat triple stream.

Args: frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames options (ParserOptions): stream options

Yields: Generator[Iterable[Triple | Prefix]]: Generator of iterables of Triple or Prefix objects, one iterable per frame.

Source code in pyjelly/integrations/rdflib/parse.py
def parse_triples_stream(
    frames: Iterable[jelly.RdfStreamFrame],
    options: ParserOptions,
) -> Generator[Iterable[Triple | Prefix]]:
    """
    Parse flat triple stream.

    Args:
        frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames
        options (ParserOptions): stream options

    Yields:
        Generator[Iterable[Triple | Prefix]]:
            Generator of iterables of Triple or Prefix objects,
            one iterable per frame.

    """
    adapter = RDFLibTriplesAdapter(options)
    decoder = Decoder(adapter=adapter)
    for frame in frames:
        yield decoder.iter_rows(frame)
    return
parse_quads_stream(frames, options)

Parse flat quads stream.

Args: frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames options (ParserOptions): stream options

Yields: Generator[Iterable[Quad | Prefix]]: Generator of iterables of Quad or Prefix objects, one iterable per frame.

Source code in pyjelly/integrations/rdflib/parse.py
def parse_quads_stream(
    frames: Iterable[jelly.RdfStreamFrame],
    options: ParserOptions,
) -> Generator[Iterable[Quad | Prefix]]:
    """
    Parse flat quads stream.

    Args:
        frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames
        options (ParserOptions): stream options

    Yields:
        Generator[Iterable[Quad | Prefix]]:
            Generator of iterables of Quad or Prefix objects,
            one iterable per frame.

    """
    adapter_class: type[RDFLibQuadsBaseAdapter]
    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_QUADS:
        adapter_class = RDFLibQuadsAdapter
    else:
        adapter_class = RDFLibGraphsAdapter
    adapter = adapter_class(options=options)
    decoder = Decoder(adapter=adapter)
    for frame in frames:
        yield decoder.iter_rows(frame)
    return
parse_jelly_grouped(inp, graph_factory=lambda: Graph(), dataset_factory=lambda: Dataset(), *, logical_type_strict=False)

Take jelly file and return generators based on the detected physical type.

Yields one graph/dataset per frame.

Args: inp (IO[bytes]): input jelly buffered binary stream graph_factory (Callable): lambda to construct a Graph. By default creates an empty in-memory Graph, but you can pass something else here. dataset_factory (Callable): lambda to construct a Dataset. By default creates an empty in-memory Dataset, but you can pass something else here. logical_type_strict (bool): If True, validate the logical type in stream options and require a grouped logical type. Otherwise, only the physical type is used to route parsing.

Raises: NotImplementedError: is raised if a physical type is not implemented

Yields: Generator[Graph] | Generator[Dataset]: returns generators for graphs/datasets based on the type of input

Source code in pyjelly/integrations/rdflib/parse.py
def parse_jelly_grouped(
    inp: IO[bytes],
    graph_factory: Callable[[], Graph] = lambda: Graph(),
    dataset_factory: Callable[[], Dataset] = lambda: Dataset(),
    *,
    logical_type_strict: bool = False,
) -> Generator[Graph] | Generator[Dataset]:
    """
    Take jelly file and return generators based on the detected physical type.

    Yields one graph/dataset per frame.

    Args:
        inp (IO[bytes]): input jelly buffered binary stream
        graph_factory (Callable): lambda to construct a Graph.
            By default creates an empty in-memory Graph,
            but you can pass something else here.
        dataset_factory (Callable): lambda to construct a Dataset.
            By default creates an empty in-memory Dataset,
            but you can pass something else here.
        logical_type_strict (bool): If True, validate the *logical* type in
            stream options and require a grouped logical type. Otherwise, only the
            physical type is used to route parsing.



    Raises:
        NotImplementedError: is raised if a physical type is not implemented

    Yields:
        Generator[Graph] | Generator[Dataset]:
            returns generators for graphs/datasets based on the type of input

    """
    options, frames = get_options_and_frames(inp)

    st = getattr(options, "stream_types", None)
    if logical_type_strict and (
        st is None
        or st.logical_type == jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED
        or st.flat
    ):
        lt_name = (
            "UNSPECIFIED"
            if st is None
            else jelly.LogicalStreamType.Name(st.logical_type)
        )

        msg = (
            "strict logical type check requires options.stream_types"
            if st is None
            else f"expected GROUPED logical type, got {lt_name}"
        )
        raise JellyConformanceError(msg)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        for graph in parse_triples_stream(
            frames=frames,
            options=options,
        ):
            sink = graph_factory()
            for graph_item in graph:
                if isinstance(graph_item, Prefix):
                    sink.bind(graph_item.prefix, graph_item.iri)
                else:
                    sink.add(graph_item)
            yield sink
        return
    elif options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        for dataset in parse_quads_stream(
            frames=frames,
            options=options,
        ):
            sink = dataset_factory()
            for item in dataset:
                if isinstance(item, Prefix):
                    sink.bind(item.prefix, item.iri)
                else:
                    s, p, o, graph_name = item
                    context = sink.get_context(graph_name)
                    sink.add((s, p, o, context))
            yield sink
        return

    physical_type_name = jelly.PhysicalStreamType.Name(
        options.stream_types.physical_type
    )
    msg = f"the stream type {physical_type_name} is not supported "
    raise NotImplementedError(msg)
parse_jelly_to_graph(inp, graph_factory=lambda: Graph(), dataset_factory=lambda: Dataset())

Add statements from Generator to provided Graph/Dataset.

Args: inp (IO[bytes]): input jelly stream. graph_factory (Callable[[], Graph]): factory to create Graph. By default creates an empty in-memory Graph, but you can pass something else here. dataset_factory (Callable[[], Dataset]): factory to create Dataset. By default creates an empty in-memory Dataset, but you can pass something else here.

Returns: Dataset | Graph: Dataset or Graph with statements.

Source code in pyjelly/integrations/rdflib/parse.py
def parse_jelly_to_graph(
    inp: IO[bytes],
    graph_factory: Callable[[], Graph] = lambda: Graph(),
    dataset_factory: Callable[[], Dataset] = lambda: Dataset(),
) -> Graph | Dataset:
    """
    Add statements from Generator to provided Graph/Dataset.

    Args:
        inp (IO[bytes]): input jelly stream.
        graph_factory (Callable[[], Graph]): factory to create Graph.
            By default creates an empty in-memory Graph,
            but you can pass something else here.
        dataset_factory (Callable[[], Dataset]): factory to create Dataset.
            By default creates an empty in-memory Dataset,
            but you can pass something else here.

    Returns:
        Dataset | Graph: Dataset or Graph with statements.

    """
    options, frames = get_options_and_frames(inp)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        sink = graph_factory()
    if options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        quad_sink = dataset_factory()
        sink = quad_sink

    for item in parse_jelly_flat(inp=inp, frames=frames, options=options):
        if isinstance(item, Prefix):
            sink.bind(item.prefix, item.iri)
        if isinstance(item, Triple):
            sink.add(item)
        if isinstance(item, Quad):
            s, p, o, graph_name = item
            context = quad_sink.get_context(graph_name)
            quad_sink.add((s, p, o, context))
    return sink
parse_jelly_flat(inp, frames=None, options=None, *, logical_type_strict=False)

Parse jelly file with FLAT logical type into a Generator of stream events.

Args: inp (IO[bytes]): input jelly buffered binary stream. frames (Iterable[jelly.RdfStreamFrame | None): jelly frames if read before. options (ParserOptions | None): stream options if read before. logical_type_strict (bool): If True, validate the logical type in stream options and require FLAT_(TRIPLES|QUADS). Otherwise, only the physical type is used to route parsing.

Raises: NotImplementedError: if physical type is not supported

Yields: Generator[Statement | Prefix]: Generator of stream events

Source code in pyjelly/integrations/rdflib/parse.py
def parse_jelly_flat(
    inp: IO[bytes],
    frames: Iterable[jelly.RdfStreamFrame] | None = None,
    options: ParserOptions | None = None,
    *,
    logical_type_strict: bool = False,
) -> Generator[Statement | Prefix]:
    """
    Parse jelly file with FLAT logical type into a Generator of stream events.

    Args:
        inp (IO[bytes]): input jelly buffered binary stream.
        frames (Iterable[jelly.RdfStreamFrame | None):
            jelly frames if read before.
        options (ParserOptions | None): stream options
            if read before.
        logical_type_strict (bool): If True, validate the *logical* type in
            stream options and require FLAT_(TRIPLES|QUADS). Otherwise, only the
            physical type is used to route parsing.


    Raises:
        NotImplementedError: if physical type is not supported

    Yields:
        Generator[Statement | Prefix]: Generator of stream events

    """
    if frames is None or options is None:
        options, frames = get_options_and_frames(inp)

    st = getattr(options, "stream_types", None)
    if logical_type_strict and (st is None or not st.flat):
        lt_name = (
            "UNSPECIFIED"
            if st is None
            else jelly.LogicalStreamType.Name(st.logical_type)
        )
        msg = (
            "strict logical type check requires options.stream_types"
            if st is None
            else f"expected FLAT logical type (TRIPLES/QUADS), got {lt_name}"
        )
        raise JellyConformanceError(msg)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        for triples in parse_triples_stream(frames=frames, options=options):
            yield from triples
        return
    if options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        for quads in parse_quads_stream(
            frames=frames,
            options=options,
        ):
            yield from quads
        return
    physical_type_name = jelly.PhysicalStreamType.Name(
        options.stream_types.physical_type
    )
    msg = f"the stream type {physical_type_name} is not supported "
    raise NotImplementedError(msg)
serialize

Classes:

Name Description
RDFLibTermEncoder
RDFLibJellySerializer

RDFLib serializer for writing graphs in Jelly RDF stream format.

Functions:

Name Description
triples_stream_frames

Serialize a Graph/Dataset into jelly frames.

quads_stream_frames

Serialize a Dataset into jelly frames.

graphs_stream_frames

Serialize a Dataset into jelly frames as a stream of graphs.

guess_options

Guess the serializer options based on the store type.

guess_stream

Return an appropriate stream implementation for the given options.

grouped_stream_to_frames

Transform Graphs/Datasets into Jelly frames, one frame per Graph/Dataset.

grouped_stream_to_file

Write stream of Graphs/Datasets to a binary file.

flat_stream_to_frames

Serialize a stream of raw triples or quads into Jelly frames.

flat_stream_to_file

Write Triple or Quad events to a binary file in Jelly flat format.

RDFLibTermEncoder(lookup_preset=None)

Bases: TermEncoder

Methods:

Name Description
encode_spo

Encode s/p/o term based on its RDFLib object.

encode_graph

Encode graph name term based on its RDFLib object.

Source code in pyjelly/serialize/encode.py
def __init__(
    self,
    lookup_preset: options.LookupPreset | None = None,
) -> None:
    if lookup_preset is None:
        lookup_preset = options.LookupPreset()
    self.lookup_preset = lookup_preset
    self.names = LookupEncoder(lookup_size=lookup_preset.max_names)
    self.prefixes = LookupEncoder(lookup_size=lookup_preset.max_prefixes)
    self.datatypes = LookupEncoder(lookup_size=lookup_preset.max_datatypes)
encode_spo(term, slot, statement)

Encode s/p/o term based on its RDFLib object.

Args: term (object): term to encode slot (Slot): its place in statement. statement (Statement): Triple/Quad message to fill with s/p/o terms.

Returns: Rows: encoded extra rows

Source code in pyjelly/integrations/rdflib/serialize.py
def encode_spo(self, term: object, slot: Slot, statement: Statement) -> Rows:
    """
    Encode s/p/o term based on its RDFLib object.

    Args:
        term (object): term to encode
        slot (Slot): its place in statement.
        statement (Statement): Triple/Quad message to fill with s/p/o terms.

    Returns:
        Rows: encoded extra rows

    """
    if isinstance(term, rdflib.URIRef):
        iri = self.get_iri_field(statement, slot)
        return self.encode_iri(term, iri)

    if isinstance(term, rdflib.Literal):
        literal = self.get_literal_field(statement, slot)
        return self.encode_literal(
            lex=str(term),
            language=term.language,
            # `datatype` is cast to `str` explicitly because
            # `URIRef.__eq__` overrides `str.__eq__` in an incompatible manner
            datatype=term.datatype and str(term.datatype),
            literal=literal,
        )

    if isinstance(term, rdflib.BNode):
        self.set_bnode_field(statement, slot, str(term))
        return ()

    return super().encode_spo(term, slot, statement)  # error if not handled
encode_graph(term, statement)

Encode graph name term based on its RDFLib object.

Args: term (object): term to encode statement (HasGraph): Quad/GraphStart message to fill g_{} in.

Returns: Rows: encoded extra rows

Source code in pyjelly/integrations/rdflib/serialize.py
def encode_graph(self, term: object, statement: HasGraph) -> Rows:
    """
    Encode graph name term based on its RDFLib object.

    Args:
        term (object): term to encode
        statement (HasGraph): Quad/GraphStart message to fill g_{} in.

    Returns:
        Rows: encoded extra rows

    """
    if term == DATASET_DEFAULT_GRAPH_ID:
        return self.encode_default_graph(statement.g_default_graph)

    if isinstance(term, rdflib.URIRef):
        return self.encode_iri(term, statement.g_iri)

    if isinstance(term, rdflib.BNode):
        statement.g_bnode = str(term)
        return ()
    return super().encode_graph(term, statement)  # error if not handled
RDFLibJellySerializer(store)

Bases: Serializer

RDFLib serializer for writing graphs in Jelly RDF stream format.

Handles streaming RDF terms into Jelly frames using internal encoders. Supports only graphs and datasets (not quoted graphs).

Methods:

Name Description
serialize

Serialize self.store content to Jelly format.

Source code in pyjelly/integrations/rdflib/serialize.py
def __init__(self, store: Graph) -> None:
    if isinstance(store, QuotedGraph):
        msg = "N3 format is not supported"
        raise NotImplementedError(msg)
    super().__init__(store)
serialize(out, /, *, stream=None, options=None, **unused)

Serialize self.store content to Jelly format.

Args: out (IO[bytes]): output buffered writer stream (Stream | None, optional): Jelly stream object. Defaults to None. options (SerializerOptions | None, optional): Serializer options if defined beforehand, e.g., read from a separate file. Defaults to None. **unused(Any): unused args for RDFLib serialize

Source code in pyjelly/integrations/rdflib/serialize.py
@override
def serialize(  # type: ignore[override]
    self,
    out: IO[bytes],
    /,
    *,
    stream: Stream | None = None,
    options: SerializerOptions | None = None,
    **unused: Any,
) -> None:
    """
    Serialize self.store content to Jelly format.

    Args:
        out (IO[bytes]): output buffered writer
        stream (Stream | None, optional): Jelly stream object. Defaults to None.
        options (SerializerOptions | None, optional): Serializer options
            if defined beforehand, e.g., read from a separate file.
            Defaults to None.
        **unused(Any): unused args for RDFLib serialize

    """
    if options is None:
        options = guess_options(self.store)
    if stream is None:
        stream = guess_stream(options, self.store)
    write = write_delimited if stream.options.params.delimited else write_single
    for stream_frame in stream_frames(stream, self.store):
        write(stream_frame, out)
triples_stream_frames(stream, data)

Serialize a Graph/Dataset into jelly frames.

Args: stream (TripleStream): stream that specifies triples processing data (Graph | Dataset | Generator[Triple]): Graph/Dataset/Statements to serialize.

Notes: if Dataset is given, its graphs are unpacked and iterated over if flow is GraphsFrameFlow, emits a frame per graph.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames.

Source code in pyjelly/integrations/rdflib/serialize.py
@stream_frames.register(TripleStream)
def triples_stream_frames(
    stream: TripleStream,
    data: Graph | Dataset | Generator[Triple],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a Graph/Dataset into jelly frames.

    Args:
        stream (TripleStream): stream that specifies triples processing
        data (Graph | Dataset | Generator[Triple]):
            Graph/Dataset/Statements to serialize.

    Notes:
        if Dataset is given, its graphs are unpacked and iterated over
        if flow is GraphsFrameFlow, emits a frame per graph.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames.

    """
    stream.enroll()
    if isinstance(data, Graph) and stream.options.params.namespace_declarations:
        namespace_declarations(data, stream)

    graphs = (data,) if not isinstance(data, Dataset) else data.graphs()
    for graph in graphs:
        for terms in graph:
            if frame := stream.triple(terms):
                yield frame
        if frame := stream.flow.frame_from_graph():
            yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame
quads_stream_frames(stream, data)

Serialize a Dataset into jelly frames.

Notes: Emits one frame per dataset if flow is of DatasetsFrameFlow.

Args: stream (QuadStream): stream that specifies quads processing data (Dataset | Generator[Quad]): Dataset to serialize.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames

Source code in pyjelly/integrations/rdflib/serialize.py
@stream_frames.register(QuadStream)
def quads_stream_frames(
    stream: QuadStream,
    data: Dataset | Generator[Quad],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a Dataset into jelly frames.

    Notes:
        Emits one frame per dataset if flow is of DatasetsFrameFlow.

    Args:
        stream (QuadStream): stream that specifies quads processing
        data (Dataset | Generator[Quad]): Dataset to serialize.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames

    """
    stream.enroll()
    if stream.options.params.namespace_declarations:
        namespace_declarations(data, stream)  # type: ignore[arg-type]

    iterator: Generator[Quad, None, None]
    if isinstance(data, Dataset):
        iterator = cast(Generator[Quad, None, None], data.quads())
    else:
        iterator = data

    for terms in iterator:
        if frame := stream.quad(terms):
            yield frame
    if frame := stream.flow.frame_from_dataset():
        yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame
graphs_stream_frames(stream, data)

Serialize a Dataset into jelly frames as a stream of graphs.

Notes: If flow of DatasetsFrameFlow type, the whole dataset will be encoded into one frame.

Args: stream (GraphStream): stream that specifies graphs processing data (Dataset | Generator[Quad]): Dataset to serialize.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames

Source code in pyjelly/integrations/rdflib/serialize.py
@stream_frames.register(GraphStream)
def graphs_stream_frames(
    stream: GraphStream,
    data: Dataset | Generator[Quad],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a Dataset into jelly frames as a stream of graphs.

    Notes:
        If flow of DatasetsFrameFlow type, the whole dataset
        will be encoded into one frame.

    Args:
        stream (GraphStream): stream that specifies graphs processing
        data (Dataset | Generator[Quad]): Dataset to serialize.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames

    """
    stream.enroll()
    if stream.options.params.namespace_declarations:
        namespace_declarations(data, stream)  # type: ignore[arg-type]

    if isinstance(data, Dataset):
        graphs = data.graphs()
    else:
        ds = Dataset()
        for quad in data:
            ctx = ds.get_context(quad.g)
            ctx.add((quad.s, quad.p, quad.o))
        graphs = ds.graphs()

    for graph in graphs:
        yield from stream.graph(graph_id=graph.identifier, graph=graph)

    if frame := stream.flow.frame_from_dataset():
        yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame
guess_options(sink)

Guess the serializer options based on the store type.

guess_options(Graph()).logical_type 1 guess_options(Dataset()).logical_type 2

Source code in pyjelly/integrations/rdflib/serialize.py
def guess_options(sink: Graph | Dataset) -> SerializerOptions:
    """
    Guess the serializer options based on the store type.

    >>> guess_options(Graph()).logical_type
    1
    >>> guess_options(Dataset()).logical_type
    2
    """
    logical_type = (
        jelly.LOGICAL_STREAM_TYPE_FLAT_QUADS
        if isinstance(sink, Dataset)
        else jelly.LOGICAL_STREAM_TYPE_FLAT_TRIPLES
    )
    # RDFLib doesn't support RDF-star and generalized statements by default
    # as it requires specific handling for quoted triples and non-standard RDF terms
    params = StreamParameters(generalized_statements=False, rdf_star=False)
    return SerializerOptions(logical_type=logical_type, params=params)
guess_stream(options, sink)

Return an appropriate stream implementation for the given options.

Notes: if base(!) logical type is GRAPHS and Dataset is given, initializes TripleStream

graph_ser = RDFLibJellySerializer(Graph()) ds_ser = RDFLibJellySerializer(Dataset())

type(guess_stream(guess_options(graph_ser.store), graph_ser.store)) type(guess_stream(guess_options(ds_ser.store), ds_ser.store))

Source code in pyjelly/integrations/rdflib/serialize.py
def guess_stream(options: SerializerOptions, sink: Graph | Dataset) -> Stream:
    """
    Return an appropriate stream implementation for the given options.

    Notes: if base(!) logical type is GRAPHS and Dataset is given,
        initializes TripleStream

    >>> graph_ser = RDFLibJellySerializer(Graph())
    >>> ds_ser = RDFLibJellySerializer(Dataset())

    >>> type(guess_stream(guess_options(graph_ser.store), graph_ser.store))
    <class 'pyjelly.serialize.streams.TripleStream'>
    >>> type(guess_stream(guess_options(ds_ser.store), ds_ser.store))
    <class 'pyjelly.serialize.streams.QuadStream'>
    """
    stream_cls: type[Stream]
    if (options.logical_type % 10) != jelly.LOGICAL_STREAM_TYPE_GRAPHS and isinstance(
        sink, Dataset
    ):
        stream_cls = QuadStream
    else:
        stream_cls = TripleStream
    return stream_cls.for_rdflib(options=options)
grouped_stream_to_frames(sink_generator, options=None)

Transform Graphs/Datasets into Jelly frames, one frame per Graph/Dataset.

Note: options are guessed if not provided.

Args: sink_generator (Generator[Graph] | Generator[Dataset]): Generator of Graphs/Dataset to transform. options (SerializerOptions | None, optional): stream options to use. Options are guessed based on the sink store type. Defaults to None.

Yields: Generator[jelly.RdfStreamFrame]: produced Jelly frames

Source code in pyjelly/integrations/rdflib/serialize.py
def grouped_stream_to_frames(
    sink_generator: Generator[Graph] | Generator[Dataset],
    options: SerializerOptions | None = None,
) -> Generator[jelly.RdfStreamFrame]:
    """
    Transform Graphs/Datasets into Jelly frames, one frame per Graph/Dataset.

    Note: options are guessed if not provided.

    Args:
        sink_generator (Generator[Graph] | Generator[Dataset]): Generator of
            Graphs/Dataset to transform.
        options (SerializerOptions | None, optional): stream options to use.
            Options are guessed based on the sink store type. Defaults to None.

    Yields:
        Generator[jelly.RdfStreamFrame]: produced Jelly frames

    """
    stream = None
    for sink in sink_generator:
        if not stream:
            if options is None:
                options = guess_options(sink)
            stream = guess_stream(options, sink)
        yield from stream_frames(stream, sink)
grouped_stream_to_file(stream, output_file, **kwargs)

Write stream of Graphs/Datasets to a binary file.

Args: stream (Generator[Graph] | Generator[Dataset]): Generator of Graphs/Dataset to transform. output_file (IO[bytes]): output buffered writer. **kwargs (Any): options to pass to stream.

Source code in pyjelly/integrations/rdflib/serialize.py
def grouped_stream_to_file(
    stream: Generator[Graph] | Generator[Dataset],
    output_file: IO[bytes],
    **kwargs: Any,
) -> None:
    """
    Write stream of Graphs/Datasets to a binary file.

    Args:
        stream (Generator[Graph] | Generator[Dataset]): Generator of
            Graphs/Dataset to transform.
        output_file (IO[bytes]): output buffered writer.
        **kwargs (Any): options to pass to stream.

    """
    for frame in grouped_stream_to_frames(stream, **kwargs):
        write_delimited(frame, output_file)
flat_stream_to_frames(statements, options=None)

Serialize a stream of raw triples or quads into Jelly frames.

Args: statements (Generator[Triple | Quad]): s/p/o triples or s/p/o/g quads to serialize. options (SerializerOptions | None, optional): if omitted, guessed based on the first tuple.

Yields: Generator[jelly.RdfStreamFrame]: generated frames.

Source code in pyjelly/integrations/rdflib/serialize.py
def flat_stream_to_frames(
    statements: Generator[Triple | Quad],
    options: SerializerOptions | None = None,
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a stream of raw triples or quads into Jelly frames.

    Args:
        statements (Generator[Triple | Quad]):
          s/p/o triples or s/p/o/g quads to serialize.
        options (SerializerOptions | None, optional):
            if omitted, guessed based on the first tuple.

    Yields:
        Generator[jelly.RdfStreamFrame]: generated frames.

    """
    first = next(statements, None)
    if first is None:
        return

    sink = Dataset() if len(first) == QUAD_ARITY else Graph()
    if options is None:
        options = guess_options(sink)
    stream = guess_stream(options, sink)

    combined: Generator[Triple | Quad] | Graph = (
        item for item in chain([first], statements)
    )

    yield from stream_frames(stream, combined)
flat_stream_to_file(statements, output_file, options=None)

Write Triple or Quad events to a binary file in Jelly flat format.

Args: statements (Generator[Triple | Quad]): statements to serialize. output_file (IO[bytes]): output buffered writer. options (SerializerOptions | None, optional): stream options.

Source code in pyjelly/integrations/rdflib/serialize.py
def flat_stream_to_file(
    statements: Generator[Triple | Quad],
    output_file: IO[bytes],
    options: SerializerOptions | None = None,
) -> None:
    """
    Write Triple or Quad events to a binary file in Jelly flat format.

    Args:
        statements (Generator[Triple | Quad]): statements to serialize.
        output_file (IO[bytes]): output buffered writer.
        options (SerializerOptions | None, optional): stream options.

    """
    for frame in flat_stream_to_frames(statements, options):
        write_delimited(frame, output_file)

jelly

Modules:

Name Description
rdf_pb2

Generated protocol buffer code.

rdf_pb2

Generated protocol buffer code.

options

Classes:

Name Description
StreamTypes

Functions:

Name Description
register_mimetypes

Associate files that have Jelly extension with Jelly MIME types.

Attributes:

Name Type Description
INTEGRATION_SIDE_EFFECTS bool

Whether to allow integration module imports to trigger side effects.

INTEGRATION_SIDE_EFFECTS = True

Whether to allow integration module imports to trigger side effects.

These side effects are cheap and may include populating some registries for guessing the defaults for external integrations that work with Jelly.

StreamTypes(physical_type=jelly.PHYSICAL_STREAM_TYPE_UNSPECIFIED, logical_type=jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED)

Methods:

Name Description
__repr__

Return the representation of StreamTypes.

__repr__()

Return the representation of StreamTypes.

repr(StreamTypes(9999, 8888)) 'StreamTypes(9999, 8888)'

Source code in pyjelly/options.py
def __repr__(self) -> str:
    """
    Return the representation of StreamTypes.

    >>> repr(StreamTypes(9999, 8888))
    'StreamTypes(9999, 8888)'
    """
    with suppress(ValueError):
        physical_type_name = jelly.PhysicalStreamType.Name(self.physical_type)
        logical_type_name = jelly.LogicalStreamType.Name(self.logical_type)
        return f"StreamTypes({physical_type_name}, {logical_type_name})"
    return f"StreamTypes({self.physical_type}, {self.logical_type})"
register_mimetypes(extension='.jelly')

Associate files that have Jelly extension with Jelly MIME types.

register_mimetypes() mimetypes.guess_type("out.jelly") ('application/x-jelly-rdf', None)

Source code in pyjelly/options.py
def register_mimetypes(extension: str = ".jelly") -> None:
    """
    Associate files that have Jelly extension with Jelly MIME types.

    >>> register_mimetypes()
    >>> mimetypes.guess_type("out.jelly")
    ('application/x-jelly-rdf', None)
    """
    for mimetype in MIMETYPES:
        mimetypes.add_type(mimetype, extension)

parse

Modules:

Name Description
decode
ioutils
lookup
decode

Classes:

Name Description
ParsingMode

Specifies how jelly frames should be treated.

Decoder

Functions:

Name Description
options_from_frame

Fill stream options based on the options row.

ParsingMode

Bases: Enum

Specifies how jelly frames should be treated.

Modes: FLAT Yield all frames as one Graph or Dataset. GROUPED Yield one Graph/Dataset per frame (grouped parsing).

Decoder(adapter)

Initializes decoder with a lookup tables with preset sizes, integration-dependent adapter and empty repeated terms dictionary.

Args: adapter (Adapter): integration-dependent adapter that specifies terms conversion to specific objects, framing, namespace declarations, and graphs/datasets forming.

Methods:

Name Description
iter_rows

Iterate through rows in the frame.

decode_row

Decode a row based on its type.

ingest_prefix_entry

Update prefix lookup table based on the table entry.

ingest_name_entry

Update name lookup table based on the table entry.

ingest_datatype_entry

Update datatype lookup table based on the table entry.

decode_term

Decode a term based on its type: IRI/literal/BN/default graph.

decode_iri

Decode RdfIri message to IRI using a custom adapter.

decode_bnode

Decode string message to blank node (BN) using a custom adapter.

decode_literal

Decode RdfLiteral to literal based on custom adapter implementation.

decode_statement

Decode a triple/quad message.

Source code in pyjelly/parse/decode.py
def __init__(self, adapter: Adapter) -> None:
    """
    Initialize decoder.

    Initializes decoder with a lookup tables with preset sizes,
    integration-dependent adapter and empty repeated terms dictionary.

    Args:
        adapter (Adapter): integration-dependent adapter that specifies terms
        conversion to specific objects, framing,
        namespace declarations, and graphs/datasets forming.

    """
    self.adapter = adapter
    self.names = LookupDecoder(lookup_size=self.options.lookup_preset.max_names)
    self.prefixes = LookupDecoder(
        lookup_size=self.options.lookup_preset.max_prefixes
    )
    self.datatypes = LookupDecoder(
        lookup_size=self.options.lookup_preset.max_datatypes
    )
    self.repeated_terms: dict[str, jelly.RdfIri | str | jelly.RdfLiteral] = {}
iter_rows(frame)

Iterate through rows in the frame.

Args: frame (jelly.RdfStreamFrame): jelly frame Yields: Iterator[Any]: decoded rows

Source code in pyjelly/parse/decode.py
def iter_rows(self, frame: jelly.RdfStreamFrame) -> Iterator[Any]:
    """
    Iterate through rows in the frame.

    Args:
        frame (jelly.RdfStreamFrame): jelly frame
    Yields:
        Iterator[Any]: decoded rows

    """
    for row_owner in frame.rows:
        row = getattr(row_owner, row_owner.WhichOneof("row"))
        decoded_row = self.decode_row(row)
        if isinstance(
            row, (jelly.RdfTriple, jelly.RdfQuad, jelly.RdfNamespaceDeclaration)
        ):
            yield decoded_row
decode_row(row)

Decode a row based on its type.

Notes: uses custom adapters to decode triples/quads, namespace declarations, graph start/end.

Args: row (Any): protobuf row message

Raises: TypeError: raises error if this type of protobuf message does not have a respective handler

Returns: Any | None: decoded row - result from calling decode_row (row type appropriate handler)

Source code in pyjelly/parse/decode.py
def decode_row(self, row: Any) -> Any | None:
    """
    Decode a row based on its type.

    Notes: uses custom adapters to decode triples/quads, namespace declarations,
           graph start/end.

    Args:
        row (Any): protobuf row message

    Raises:
        TypeError: raises error if this type of protobuf message does not have
                   a respective handler

    Returns:
        Any | None: decoded row -
                    result from calling decode_row (row type appropriate handler)

    """
    try:
        decode_row = self.row_handlers[type(row)]
    except KeyError:
        msg = f"decoder not implemented for {type(row)}"
        raise TypeError(msg) from None
    return decode_row(self, row)
ingest_prefix_entry(entry)

Update prefix lookup table based on the table entry.

Args: entry (jelly.RdfPrefixEntry): prefix message, containing id and value

Source code in pyjelly/parse/decode.py
def ingest_prefix_entry(self, entry: jelly.RdfPrefixEntry) -> None:
    """
    Update prefix lookup table based on the table entry.

    Args:
        entry (jelly.RdfPrefixEntry): prefix message, containing id and value

    """
    self.prefixes.assign_entry(index=entry.id, value=entry.value)
ingest_name_entry(entry)

Update name lookup table based on the table entry.

Args: entry (jelly.RdfNameEntry): name message, containing id and value

Source code in pyjelly/parse/decode.py
def ingest_name_entry(self, entry: jelly.RdfNameEntry) -> None:
    """
    Update name lookup table based on the table entry.

    Args:
        entry (jelly.RdfNameEntry): name message, containing id and value

    """
    self.names.assign_entry(index=entry.id, value=entry.value)
ingest_datatype_entry(entry)

Update datatype lookup table based on the table entry.

Args: entry (jelly.RdfDatatypeEntry): name message, containing id and value

Source code in pyjelly/parse/decode.py
def ingest_datatype_entry(self, entry: jelly.RdfDatatypeEntry) -> None:
    """
    Update datatype lookup table based on the table entry.

    Args:
        entry (jelly.RdfDatatypeEntry): name message, containing id and value

    """
    self.datatypes.assign_entry(index=entry.id, value=entry.value)
decode_term(term)

Decode a term based on its type: IRI/literal/BN/default graph.

Notes: requires a custom adapter with implemented methods for terms decoding.

Args: term (Any): IRI/literal/BN(string)/Default graph message

Raises: TypeError: raises error if no handler for the term is found

Returns: Any: decoded term (currently, rdflib objects, e.g., rdflib.term.URIRef)

Source code in pyjelly/parse/decode.py
def decode_term(self, term: Any) -> Any:
    """
    Decode a term based on its type: IRI/literal/BN/default graph.

    Notes: requires a custom adapter with implemented methods for terms decoding.

    Args:
        term (Any): IRI/literal/BN(string)/Default graph message

    Raises:
        TypeError: raises error if no handler for the term is found

    Returns:
        Any: decoded term (currently, rdflib objects, e.g., rdflib.term.URIRef)

    """
    try:
        decode_term = self.term_handlers[type(term)]
    except KeyError:
        msg = f"decoder not implemented for {type(term)}"
        raise TypeError(msg) from None
    return decode_term(self, term)
decode_iri(iri)

Decode RdfIri message to IRI using a custom adapter.

Args: iri (jelly.RdfIri): RdfIri message

Returns: Any: IRI, based on adapter implementation, e.g., rdflib.term.URIRef

Source code in pyjelly/parse/decode.py
def decode_iri(self, iri: jelly.RdfIri) -> Any:
    """
    Decode RdfIri message to IRI using a custom adapter.

    Args:
        iri (jelly.RdfIri): RdfIri message

    Returns:
        Any: IRI, based on adapter implementation, e.g., rdflib.term.URIRef

    """
    name = self.names.decode_name_term_index(iri.name_id)
    prefix = self.prefixes.decode_prefix_term_index(iri.prefix_id)
    return self.adapter.iri(iri=prefix + name)
decode_bnode(bnode)

Decode string message to blank node (BN) using a custom adapter.

Args: bnode (str): blank node id

Returns: Any: blank node object from the custom adapter

Source code in pyjelly/parse/decode.py
def decode_bnode(self, bnode: str) -> Any:
    """
    Decode string message to blank node (BN) using a custom adapter.

    Args:
        bnode (str): blank node id

    Returns:
        Any: blank node object from the custom adapter

    """
    return self.adapter.bnode(bnode)
decode_literal(literal)

Decode RdfLiteral to literal based on custom adapter implementation.

Notes: checks for langtag existence; for datatype checks for non-zero table size and datatype field presence

Args: literal (jelly.RdfLiteral): RdfLiteral message

Returns: Any: literal returned by the custom adapter

Source code in pyjelly/parse/decode.py
def decode_literal(self, literal: jelly.RdfLiteral) -> Any:
    """
    Decode RdfLiteral to literal based on custom adapter implementation.

    Notes: checks for langtag existence;
           for datatype checks for non-zero table size and datatype field presence

    Args:
        literal (jelly.RdfLiteral): RdfLiteral message

    Returns:
        Any: literal returned by the custom adapter

    """
    language = datatype = None
    if literal.langtag:
        language = literal.langtag
    elif self.datatypes.lookup_size and literal.HasField("datatype"):
        datatype = self.datatypes.decode_datatype_term_index(literal.datatype)
    return self.adapter.literal(
        lex=literal.lex,
        language=language,
        datatype=datatype,
    )
decode_statement(statement, oneofs)

Decode a triple/quad message.

Notes: also updates repeated terms dictionary

Args: statement (jelly.RdfTriple | jelly.RdfQuad): triple/quad message oneofs (Sequence[str]): terms s/p/o/g(if quads)

Raises: ValueError: if a missing repeated term is encountered

Returns: Any: a list of decoded terms

Source code in pyjelly/parse/decode.py
def decode_statement(
    self,
    statement: jelly.RdfTriple | jelly.RdfQuad,
    oneofs: Sequence[str],
) -> Any:
    """
    Decode a triple/quad message.

    Notes: also updates repeated terms dictionary

    Args:
        statement (jelly.RdfTriple | jelly.RdfQuad): triple/quad message
        oneofs (Sequence[str]): terms s/p/o/g(if quads)

    Raises:
        ValueError: if a missing repeated term is encountered

    Returns:
        Any: a list of decoded terms

    """
    terms = []
    for oneof in oneofs:
        field = statement.WhichOneof(oneof)
        if field:
            jelly_term = getattr(statement, field)
            decoded_term = self.decode_term(jelly_term)
            self.repeated_terms[oneof] = decoded_term
        else:
            decoded_term = self.repeated_terms[oneof]
            if decoded_term is None:
                msg = f"missing repeated term {oneof}"
                raise ValueError(msg)
        terms.append(decoded_term)
    return terms
options_from_frame(frame, *, delimited)

Fill stream options based on the options row.

Notes: generalized_statements, rdf_star, and namespace declarations are set to false by default

Args: frame (jelly.RdfStreamFrame): first non-empty frame from the stream delimited (bool): derived delimited flag

Returns: ParserOptions: filled options with types/lookups/stream parameters information

Source code in pyjelly/parse/decode.py
def options_from_frame(
    frame: jelly.RdfStreamFrame,
    *,
    delimited: bool,
) -> ParserOptions:
    """
    Fill stream options based on the options row.

    Notes:
        generalized_statements, rdf_star, and namespace declarations
        are set to false by default

    Args:
        frame (jelly.RdfStreamFrame): first non-empty frame from the stream
        delimited (bool): derived delimited flag

    Returns:
        ParserOptions: filled options with types/lookups/stream parameters information

    """
    row = frame.rows[0]
    options = row.options
    nd = getattr(options, "namespace_declarations", False) or (
        options.version >= MAX_VERSION
    )
    return ParserOptions(
        stream_types=StreamTypes(
            physical_type=options.physical_type,
            logical_type=options.logical_type,
        ),
        lookup_preset=LookupPreset(
            max_names=options.max_name_table_size,
            max_prefixes=options.max_prefix_table_size,
            max_datatypes=options.max_datatype_table_size,
        ),
        params=StreamParameters(
            stream_name=options.stream_name,
            generalized_statements=options.generalized_statements,
            rdf_star=options.rdf_star,
            version=options.version,
            delimited=delimited,
            namespace_declarations=nd,
        ),
    )
ioutils

Functions:

Name Description
delimited_jelly_hint

Detect whether a Jelly file is delimited from its first 3 bytes.

get_options_and_frames

Return stream options and frames from the buffered binary stream.

delimited_jelly_hint(header)

Detect whether a Jelly file is delimited from its first 3 bytes.

Truth table (notation: 0A = 0x0A, NN = not 0x0A, ?? = don't care):

Byte 1 Byte 2 Byte 3 Result
NN ?? ?? Delimited
0A NN ?? Non-delimited
0A 0A NN Delimited (size = 10)
0A 0A 0A Non-delimited (stream options size = 10)

delimited_jelly_hint(bytes([0x00, 0x00, 0x00])) True

delimited_jelly_hint(bytes([0x00, 0x00, 0x0A])) True

delimited_jelly_hint(bytes([0x00, 0x0A, 0x00])) True

delimited_jelly_hint(bytes([0x00, 0x0A, 0x0A])) True

delimited_jelly_hint(bytes([0x0A, 0x00, 0x00])) False

delimited_jelly_hint(bytes([0x0A, 0x00, 0x0A])) False

delimited_jelly_hint(bytes([0x0A, 0x0A, 0x00])) True

delimited_jelly_hint(bytes([0x0A, 0x0A, 0x0A])) False

Source code in pyjelly/parse/ioutils.py
def delimited_jelly_hint(header: bytes) -> bool:
    """
    Detect whether a Jelly file is delimited from its first 3 bytes.

    Truth table (notation: `0A` = `0x0A`, `NN` = `not 0x0A`, `??` = _don't care_):

    | Byte 1 | Byte 2 | Byte 3 | Result                                   |
    |--------|--------|--------|------------------------------------------|
    | `NN`   |  `??`  |  `??`  | Delimited                                |
    | `0A`   |  `NN`  |  `??`  | Non-delimited                            |
    | `0A`   |  `0A`  |  `NN`  | Delimited (size = 10)                    |
    | `0A`   |  `0A`  |  `0A`  | Non-delimited (stream options size = 10) |

    >>> delimited_jelly_hint(bytes([0x00, 0x00, 0x00]))
    True

    >>> delimited_jelly_hint(bytes([0x00, 0x00, 0x0A]))
    True

    >>> delimited_jelly_hint(bytes([0x00, 0x0A, 0x00]))
    True

    >>> delimited_jelly_hint(bytes([0x00, 0x0A, 0x0A]))
    True

    >>> delimited_jelly_hint(bytes([0x0A, 0x00, 0x00]))
    False

    >>> delimited_jelly_hint(bytes([0x0A, 0x00, 0x0A]))
    False

    >>> delimited_jelly_hint(bytes([0x0A, 0x0A, 0x00]))
    True

    >>> delimited_jelly_hint(bytes([0x0A, 0x0A, 0x0A]))
    False
    """
    magic = 0x0A
    return len(header) >= 3 and (  # noqa: PLR2004
        header[0] != magic or (header[1] == magic and header[2] != magic)
    )
get_options_and_frames(inp)

Return stream options and frames from the buffered binary stream.

Args: inp (IO[bytes]): jelly buffered binary stream

Raises: JellyConformanceError: if no non-empty frames detected in the delimited stream JellyConformanceError: if non-delimited, error is raised if no rows are detected (empty frame)

Returns: tuple[ParserOptions, Iterator[jelly.RdfStreamFrame]]: ParserOptions holds: stream types, lookup presets and other stream options

Source code in pyjelly/parse/ioutils.py
def get_options_and_frames(
    inp: IO[bytes],
) -> tuple[ParserOptions, Iterator[jelly.RdfStreamFrame]]:
    """
    Return stream options and frames from the buffered binary stream.

    Args:
        inp (IO[bytes]): jelly buffered binary stream

    Raises:
        JellyConformanceError: if no non-empty frames detected in the delimited stream
        JellyConformanceError: if non-delimited,
            error is raised if no rows are detected (empty frame)

    Returns:
        tuple[ParserOptions, Iterator[jelly.RdfStreamFrame]]: ParserOptions holds:
            stream types, lookup presets and other stream options

    """
    if not inp.seekable():
        # Input may not be seekable (e.g. a network stream) -- then we need to buffer
        # it to determine if it's delimited.
        # See also: https://github.com/Jelly-RDF/pyjelly/issues/298
        inp = io.BufferedReader(inp)  # type: ignore[arg-type]
        is_delimited = delimited_jelly_hint(inp.peek(3))
    else:
        is_delimited = delimited_jelly_hint(bytes_read := inp.read(3))
        inp.seek(-len(bytes_read), os.SEEK_CUR)

    if is_delimited:
        first_frame = None
        skipped_frames = []
        frames = frame_iterator(inp)
        for frame in frames:
            if not frame.rows:
                skipped_frames.append(frame)
            else:
                first_frame = frame
                break
        if first_frame is None:
            msg = "No non-empty frames found in the stream"
            raise JellyConformanceError(msg)

        options = options_from_frame(first_frame, delimited=True)
        return options, chain(skipped_frames, (first_frame,), frames)

    frame = parse(jelly.RdfStreamFrame, inp.read())

    if not frame.rows:
        msg = "The stream is corrupted (only contains an empty frame)"
        raise JellyConformanceError(msg)

    options = options_from_frame(frame, delimited=False)
    return options, iter((frame,))
lookup

Classes:

Name Description
LookupDecoder

Shared base for RDF lookup encoders using Jelly compression.

LookupDecoder(*, lookup_size)

Shared base for RDF lookup encoders using Jelly compression.

Tracks the last assigned and last reused index.

Parameters:

Name Type Description Default
lookup_size int

Maximum lookup size.

required
Source code in pyjelly/parse/lookup.py
def __init__(self, *, lookup_size: int) -> None:
    if lookup_size > MAX_LOOKUP_SIZE:
        msg = f"lookup size cannot be larger than {MAX_LOOKUP_SIZE}"
        raise JellyAssertionError(msg)
    self.lookup_size = lookup_size
    placeholders = (None,) * lookup_size
    self.data: deque[str | None] = deque(placeholders, maxlen=lookup_size)
    self.last_assigned_index = 0
    self.last_reused_index = 0

serialize

Modules:

Name Description
encode
flows
lookup
streams
encode

Classes:

Name Description
TermEncoder

Functions:

Name Description
split_iri

Split iri into prefix and name.

encode_spo

Encode the s/p/o of a statement.

encode_triple

Encode one triple.

encode_quad

Encode one quad.

encode_namespace_declaration

Encode namespace declaration.

encode_options

Encode stream options to ProtoBuf message.

TermEncoder(lookup_preset=None)

Methods:

Name Description
encode_iri_indices

Encode lookup indices for IRI.

encode_iri

Encode iri.

encode_default_graph

Encode default graph.

encode_literal

Encode literal.

encode_quoted_triple

Encode a quoted triple.

get_iri_field

Get IRI field directly based on slot.

get_literal_field

Get literal field directly based on slot.

set_bnode_field

Set bnode field directly based on slot.

get_triple_field

Get triple term field directly based on slot.

Source code in pyjelly/serialize/encode.py
def __init__(
    self,
    lookup_preset: options.LookupPreset | None = None,
) -> None:
    if lookup_preset is None:
        lookup_preset = options.LookupPreset()
    self.lookup_preset = lookup_preset
    self.names = LookupEncoder(lookup_size=lookup_preset.max_names)
    self.prefixes = LookupEncoder(lookup_size=lookup_preset.max_prefixes)
    self.datatypes = LookupEncoder(lookup_size=lookup_preset.max_datatypes)
encode_iri_indices(iri_string)

Encode lookup indices for IRI.

Args: iri_string (str): full iri in string format.

Returns: tuple[Rows, int, int]: additional rows (if any) and indices in prefix and name tables.

Source code in pyjelly/serialize/encode.py
def encode_iri_indices(self, iri_string: str) -> tuple[Rows, int, int]:
    """
    Encode lookup indices for IRI.

    Args:
        iri_string (str): full iri in string format.

    Returns:
        tuple[Rows, int, int]: additional rows (if any) and
            indices in prefix and name tables.

    """
    prefix, name = split_iri(iri_string)
    if self.prefixes.lookup.max_size:
        prefix_entry_index = self.prefixes.encode_entry_index(prefix)
    else:
        name = iri_string
        prefix_entry_index = None

    name_entry_index = self.names.encode_entry_index(name)
    term_rows = []

    if prefix_entry_index is not None:
        prefix_entry = jelly.RdfPrefixEntry(id=prefix_entry_index, value=prefix)
        term_rows.append(jelly.RdfStreamRow(prefix=prefix_entry))

    if name_entry_index is not None:
        name_entry = jelly.RdfNameEntry(id=name_entry_index, value=name)
        term_rows.append(jelly.RdfStreamRow(name=name_entry))

    prefix_index = self.prefixes.encode_prefix_term_index(prefix)
    name_index = self.names.encode_name_term_index(name)
    return term_rows, prefix_index, name_index
encode_iri(iri_string, iri)

Encode iri.

Args: iri_string (str): full iri in string format. iri (jelly.RdfIri): iri to fill

Returns: Rows: extra rows for prefix and name tables, if any.

Source code in pyjelly/serialize/encode.py
def encode_iri(self, iri_string: str, iri: jelly.RdfIri) -> Rows:
    """
    Encode iri.

    Args:
        iri_string (str): full iri in string format.
        iri (jelly.RdfIri): iri to fill

    Returns:
        Rows: extra rows for prefix and name tables, if any.

    """
    term_rows, prefix_index, name_index = self.encode_iri_indices(iri_string)
    iri.prefix_id = prefix_index
    iri.name_id = name_index
    return term_rows
encode_default_graph(g_default_graph)

Encode default graph.

Returns: Rows: empty extra rows (for API consistency)

Source code in pyjelly/serialize/encode.py
def encode_default_graph(self, g_default_graph: jelly.RdfDefaultGraph) -> Rows:
    """
    Encode default graph.

    Returns:
        Rows: empty extra rows (for API consistency)

    """
    g_default_graph.CopyFrom(jelly.RdfDefaultGraph())
    return ()
encode_literal(*, lex, language=None, datatype=None, literal)

Encode literal.

Args: lex (str): lexical form/literal value language (str | None, optional): langtag. Defaults to None. datatype (str | None, optional): data type if it is a typed literal. Defaults to None. literal (jelly.RdfLiteral): literal to fill.

Raises: JellyConformanceError: if datatype specified while datatable is not used.

Returns: Rows: extra rows (i.e., datatype entries).

Source code in pyjelly/serialize/encode.py
def encode_literal(
    self,
    *,
    lex: str,
    language: str | None = None,
    datatype: str | None = None,
    literal: jelly.RdfLiteral,
) -> Rows:
    """
    Encode literal.

    Args:
        lex (str): lexical form/literal value
        language (str | None, optional): langtag. Defaults to None.
        datatype (str | None, optional): data type if
        it is a typed literal. Defaults to None.
        literal (jelly.RdfLiteral): literal to fill.

    Raises:
        JellyConformanceError: if datatype specified while
            datatable is not used.

    Returns:
        Rows: extra rows (i.e., datatype entries).

    """
    datatype_id = None
    term_rows: tuple[()] | tuple[jelly.RdfStreamRow] = ()

    if datatype and datatype != options.STRING_DATATYPE_IRI:
        if self.datatypes.lookup.max_size == 0:
            msg = (
                f"can't encode literal with type {datatype}: "
                "datatype lookup cannot be used if disabled "
                "(its size was set to 0)"
            )
            raise JellyConformanceError(msg)
        datatype_entry_id = self.datatypes.encode_entry_index(datatype)

        if datatype_entry_id is not None:
            entry = jelly.RdfDatatypeEntry(id=datatype_entry_id, value=datatype)
            term_rows = (jelly.RdfStreamRow(datatype=entry),)

        datatype_id = self.datatypes.encode_datatype_term_index(datatype)

    literal.lex = lex
    if language:
        literal.langtag = language
    if datatype_id:
        literal.datatype = datatype_id
    return term_rows
encode_quoted_triple(terms, quoted_statement)

Encode a quoted triple.

Notes: Although a triple, it is treated as a part of a statement. Repeated terms are not used when encoding quoted triples.

Args: terms (Iterable[object]): triple terms to encode. quoted_statement (jelly.RdfTriple): quoted triple to fill.

Returns: Rows: additional stream rows with preceeding information (prefixes, names, datatypes rows, if any).

Source code in pyjelly/serialize/encode.py
def encode_quoted_triple(
    self, terms: Iterable[object], quoted_statement: jelly.RdfTriple
) -> Rows:
    """
    Encode a quoted triple.

    Notes:
        Although a triple, it is treated as a part of a statement.
        Repeated terms are not used when encoding quoted triples.

    Args:
        terms (Iterable[object]): triple terms to encode.
        quoted_statement (jelly.RdfTriple): quoted triple to fill.

    Returns:
        Rows: additional stream rows with preceeding
            information (prefixes, names, datatypes rows, if any).

    """
    rows: list[jelly.RdfStreamRow] = []
    terms = iter(terms)
    extra_rows = self.encode_spo(next(terms), Slot.subject, quoted_statement)
    rows.extend(extra_rows)
    extra_rows = self.encode_spo(next(terms), Slot.predicate, quoted_statement)
    rows.extend(extra_rows)
    extra_rows = self.encode_spo(next(terms), Slot.object, quoted_statement)
    rows.extend(extra_rows)
    return rows
get_iri_field(statement, slot)

Get IRI field directly based on slot.

Source code in pyjelly/serialize/encode.py
def get_iri_field(self, statement: Statement, slot: Slot) -> jelly.RdfIri:
    """Get IRI field directly based on slot."""
    if slot == Slot.subject:
        return statement.s_iri
    if slot == Slot.predicate:
        return statement.p_iri
    return statement.o_iri
get_literal_field(statement, slot)

Get literal field directly based on slot.

Source code in pyjelly/serialize/encode.py
def get_literal_field(self, statement: Statement, slot: Slot) -> jelly.RdfLiteral:
    """Get literal field directly based on slot."""
    if slot == Slot.subject:
        return statement.s_literal
    if slot == Slot.predicate:
        return statement.p_literal
    return statement.o_literal
set_bnode_field(statement, slot, identifier)

Set bnode field directly based on slot.

Source code in pyjelly/serialize/encode.py
def set_bnode_field(
    self, statement: Statement, slot: Slot, identifier: str
) -> None:
    """Set bnode field directly based on slot."""
    if slot == Slot.subject:
        statement.s_bnode = identifier
    elif slot == Slot.predicate:
        statement.p_bnode = identifier
    else:
        statement.o_bnode = identifier
get_triple_field(statement, slot)

Get triple term field directly based on slot.

Source code in pyjelly/serialize/encode.py
def get_triple_field(self, statement: Statement, slot: Slot) -> jelly.RdfTriple:
    """Get triple term field directly based on slot."""
    if slot == Slot.subject:
        return statement.s_triple_term
    if slot == Slot.predicate:
        return statement.p_triple_term
    return statement.o_triple_term
split_iri(iri_string)

Split iri into prefix and name.

Args: iri_string (str): full iri string.

Returns: tuple[str, str]: iri's prefix and name.

Source code in pyjelly/serialize/encode.py
def split_iri(iri_string: str) -> tuple[str, str]:
    """
    Split iri into prefix and name.

    Args:
        iri_string (str): full iri string.

    Returns:
        tuple[str, str]: iri's prefix and name.

    """
    name = iri_string
    prefix = ""
    for sep in "#", "/":
        prefix, char, name = iri_string.rpartition(sep)
        if char:
            return prefix + char, name
    return prefix, name
encode_spo(terms, term_encoder, repeated_terms, statement)

Encode the s/p/o of a statement.

Args: terms (Iterator[object]): iterator for original terms to encode term_encoder (TermEncoder): encoder with lookup tables repeated_terms (list[object | None): list of repeated terms. statement (Statement): Triple/Quad to fill.

Returns: list[jelly.RdfStreamRow] extra rows to append.

Source code in pyjelly/serialize/encode.py
def encode_spo(
    terms: Iterator[object],
    term_encoder: TermEncoder,
    repeated_terms: list[object | None],
    statement: Statement,
) -> list[jelly.RdfStreamRow]:
    """
    Encode the s/p/o of a statement.

    Args:
        terms (Iterator[object]): iterator for original terms to encode
        term_encoder (TermEncoder): encoder with lookup tables
        repeated_terms (list[object | None): list of repeated terms.
        statement (Statement): Triple/Quad to fill.

    Returns:
        list[jelly.RdfStreamRow] extra rows to append.

    """
    rows: list[jelly.RdfStreamRow] = []
    s = next(terms)
    if repeated_terms[Slot.subject] != s:
        extra_rows = term_encoder.encode_spo(s, Slot.subject, statement)
        rows.extend(extra_rows)
        repeated_terms[Slot.subject] = s
    p = next(terms)
    if repeated_terms[Slot.predicate] != p:
        extra_rows = term_encoder.encode_spo(p, Slot.predicate, statement)
        rows.extend(extra_rows)
        repeated_terms[Slot.predicate] = p
    o = next(terms)
    if repeated_terms[Slot.object] != o:
        extra_rows = term_encoder.encode_spo(o, Slot.object, statement)
        rows.extend(extra_rows)
        repeated_terms[Slot.object] = o
    return rows
encode_triple(terms, term_encoder, repeated_terms)

Encode one triple.

Args: terms (Iterable[object]): original terms to encode term_encoder (TermEncoder): current encoder with lookup tables repeated_terms (list[object | None]): list of repeated terms.

Returns: list[jelly.RdfStreamRow]: list of rows to add to the current flow.

Source code in pyjelly/serialize/encode.py
def encode_triple(
    terms: Iterable[object],
    term_encoder: TermEncoder,
    repeated_terms: list[object | None],
) -> list[jelly.RdfStreamRow]:
    """
    Encode one triple.

    Args:
        terms (Iterable[object]): original terms to encode
        term_encoder (TermEncoder): current encoder with lookup tables
        repeated_terms (list[object | None]): list of repeated terms.

    Returns:
        list[jelly.RdfStreamRow]: list of rows to add to the current flow.

    """
    triple = jelly.RdfTriple()
    terms = iter(terms)
    rows = encode_spo(terms, term_encoder, repeated_terms, triple)
    row = jelly.RdfStreamRow(triple=triple)
    rows.append(row)
    return rows
encode_quad(terms, term_encoder, repeated_terms)

Encode one quad.

Args: terms (Iterable[object]): original terms to encode term_encoder (TermEncoder): current encoder with lookup tables repeated_terms (list[object | None]): list of repeated terms.

Returns: list[jelly.RdfStreamRow]: list of messages to append to current flow.

Source code in pyjelly/serialize/encode.py
def encode_quad(
    terms: Iterable[object],
    term_encoder: TermEncoder,
    repeated_terms: list[object | None],
) -> list[jelly.RdfStreamRow]:
    """
    Encode one quad.

    Args:
        terms (Iterable[object]): original terms to encode
        term_encoder (TermEncoder): current encoder with lookup tables
        repeated_terms (list[object | None]): list of repeated terms.

    Returns:
        list[jelly.RdfStreamRow]: list of messages to append to current flow.

    """
    terms = iter(terms)
    quad = jelly.RdfQuad()
    rows = encode_spo(terms, term_encoder, repeated_terms, quad)
    g = next(terms)
    if repeated_terms[Slot.graph] != g:
        extra_rows = term_encoder.encode_graph(g, quad)
        rows.extend(extra_rows)
        repeated_terms[Slot.graph] = g
    row = jelly.RdfStreamRow(quad=quad)
    rows.append(row)
    return rows
encode_namespace_declaration(name, value, term_encoder)

Encode namespace declaration.

Args: name (str): namespace prefix label value (str): namespace iri term_encoder (TermEncoder): current encoder

Returns: list[jelly.RdfStreamRow]: list of messages to append to current flow.

Source code in pyjelly/serialize/encode.py
def encode_namespace_declaration(
    name: str,
    value: str,
    term_encoder: TermEncoder,
) -> list[jelly.RdfStreamRow]:
    """
    Encode namespace declaration.

    Args:
        name (str): namespace prefix label
        value (str): namespace iri
        term_encoder (TermEncoder): current encoder

    Returns:
        list[jelly.RdfStreamRow]: list of messages to append to current flow.

    """
    iri = jelly.RdfIri()
    [*rows] = term_encoder.encode_iri(value, iri=iri)
    declaration = jelly.RdfNamespaceDeclaration(name=name, value=iri)
    row = jelly.RdfStreamRow(namespace=declaration)
    rows.append(row)
    return rows
encode_options(lookup_preset, stream_types, params)

Encode stream options to ProtoBuf message.

Args: lookup_preset (options.LookupPreset): lookup tables options stream_types (options.StreamTypes): physical and logical types params (options.StreamParameters): other params.

Returns: jelly.RdfStreamRow: encoded stream options row

Source code in pyjelly/serialize/encode.py
def encode_options(
    lookup_preset: options.LookupPreset,
    stream_types: options.StreamTypes,
    params: options.StreamParameters,
) -> jelly.RdfStreamRow:
    """
    Encode stream options to ProtoBuf message.

    Args:
        lookup_preset (options.LookupPreset): lookup tables options
        stream_types (options.StreamTypes): physical and logical types
        params (options.StreamParameters): other params.

    Returns:
        jelly.RdfStreamRow: encoded stream options row

    """
    return jelly.RdfStreamRow(
        options=jelly.RdfStreamOptions(
            stream_name=params.stream_name,
            physical_type=stream_types.physical_type,
            generalized_statements=params.generalized_statements,
            rdf_star=params.rdf_star,
            max_name_table_size=lookup_preset.max_names,
            max_prefix_table_size=lookup_preset.max_prefixes,
            max_datatype_table_size=lookup_preset.max_datatypes,
            logical_type=stream_types.logical_type,
            version=params.version,
        )
    )
flows

Classes:

Name Description
FrameFlow

Abstract base class for producing Jelly frames from RDF stream rows.

ManualFrameFlow

Produces frames only when manually requested (never automatically).

BoundedFrameFlow

Produce frames automatically when a fixed number of rows is reached.

GraphsFrameFlow
DatasetsFrameFlow

Functions:

Name Description
flow_for_type

Return flow based on logical type requested.

FrameFlow(initlist=None, *, logical_type=None, **__kwargs)

Bases: UserList[RdfStreamRow]

Abstract base class for producing Jelly frames from RDF stream rows.

Collects stream rows and assembles them into RdfStreamFrame objects when ready.

Allows for passing LogicalStreamType, required for logical subtypes and non-delimited streams.

Methods:

Name Description
frame_from_graph

Treat the current rows as a graph and produce a frame.

frame_from_dataset

Treat the current rows as a dataset and produce a frame.

to_stream_frame

Create stream frame from flow content.

Source code in pyjelly/serialize/flows.py
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    *,
    logical_type: jelly.LogicalStreamType | None = None,
    **__kwargs: Any,
) -> None:
    super().__init__(initlist)
    self.logical_type = logical_type or self.__class__.logical_type
frame_from_graph()

Treat the current rows as a graph and produce a frame.

Default implementation returns None.

Source code in pyjelly/serialize/flows.py
def frame_from_graph(self) -> jelly.RdfStreamFrame | None:
    """
    Treat the current rows as a graph and produce a frame.

    Default implementation returns None.
    """
    return None
frame_from_dataset()

Treat the current rows as a dataset and produce a frame.

Default implementation returns None.

Source code in pyjelly/serialize/flows.py
def frame_from_dataset(self) -> jelly.RdfStreamFrame | None:
    """
    Treat the current rows as a dataset and produce a frame.

    Default implementation returns None.
    """
    return None
to_stream_frame()

Create stream frame from flow content.

Notes: Clears flow content after creating the frame.

Returns: jelly.RdfStreamFrame | None: stream frame

Source code in pyjelly/serialize/flows.py
def to_stream_frame(self) -> jelly.RdfStreamFrame | None:
    """
    Create stream frame from flow content.

    Notes:
        Clears flow content after creating the frame.

    Returns:
        jelly.RdfStreamFrame | None: stream frame

    """
    if not self:
        return None
    frame = jelly.RdfStreamFrame(rows=self)
    self.clear()
    return frame
ManualFrameFlow(initlist=None, *, logical_type=None, **__kwargs)

Bases: FrameFlow

Produces frames only when manually requested (never automatically).

Warning

All stream rows are kept in memory until to_stream_frame() is called. This may lead to high memory usage for large streams.

Used for non-delimited serialization.

Source code in pyjelly/serialize/flows.py
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    *,
    logical_type: jelly.LogicalStreamType | None = None,
    **__kwargs: Any,
) -> None:
    super().__init__(initlist)
    self.logical_type = logical_type or self.__class__.logical_type
BoundedFrameFlow(initlist=None, logical_type=None, *, frame_size=None)

Bases: FrameFlow

Produce frames automatically when a fixed number of rows is reached.

Used for delimited encoding (default mode).

Methods:

Name Description
frame_from_bounds

Emit frame from flow if full.

Source code in pyjelly/serialize/flows.py
@override
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    logical_type: jelly.LogicalStreamType | None = None,
    *,
    frame_size: int | None = None,
) -> None:
    super().__init__(initlist, logical_type=logical_type)
    self.frame_size = frame_size or DEFAULT_FRAME_SIZE
frame_from_bounds()

Emit frame from flow if full.

Returns: jelly.RdfStreamFrame | None: stream frame

Source code in pyjelly/serialize/flows.py
@override
def frame_from_bounds(self) -> jelly.RdfStreamFrame | None:
    """
    Emit frame from flow if full.

    Returns:
        jelly.RdfStreamFrame | None: stream frame

    """
    if len(self) >= self.frame_size:
        return self.to_stream_frame()
    return None
GraphsFrameFlow(initlist=None, *, logical_type=None, **__kwargs)

Bases: FrameFlow

Methods:

Name Description
frame_from_graph

Emit current flow content (one graph) as jelly frame.

Source code in pyjelly/serialize/flows.py
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    *,
    logical_type: jelly.LogicalStreamType | None = None,
    **__kwargs: Any,
) -> None:
    super().__init__(initlist)
    self.logical_type = logical_type or self.__class__.logical_type
frame_from_graph()

Emit current flow content (one graph) as jelly frame.

Returns: jelly.RdfStreamFrame | None: jelly frame or none if flow is empty.

Source code in pyjelly/serialize/flows.py
def frame_from_graph(self) -> jelly.RdfStreamFrame | None:
    """
    Emit current flow content (one graph) as jelly frame.

    Returns:
        jelly.RdfStreamFrame | None: jelly frame or none if
            flow is empty.

    """
    return self.to_stream_frame()
DatasetsFrameFlow(initlist=None, *, logical_type=None, **__kwargs)

Bases: FrameFlow

Methods:

Name Description
frame_from_dataset

Emit current flow content (dataset) as jelly frame.

Source code in pyjelly/serialize/flows.py
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    *,
    logical_type: jelly.LogicalStreamType | None = None,
    **__kwargs: Any,
) -> None:
    super().__init__(initlist)
    self.logical_type = logical_type or self.__class__.logical_type
frame_from_dataset()

Emit current flow content (dataset) as jelly frame.

Returns: jelly.RdfStreamFrame | None: jelly frame or none if flow is empty.

Source code in pyjelly/serialize/flows.py
def frame_from_dataset(self) -> jelly.RdfStreamFrame | None:
    """
    Emit current flow content (dataset) as jelly frame.

    Returns:
        jelly.RdfStreamFrame | None: jelly frame or none if
            flow is empty.

    """
    return self.to_stream_frame()
flow_for_type(logical_type)

Return flow based on logical type requested.

Note: uses base logical type for subtypes (i.e., SUBJECT_GRAPHS uses the same flow as its base type GRAPHS).

Args: logical_type (jelly.LogicalStreamType): logical type requested.

Raises: NotImplementedError: if (base) logical stream type is not supported.

Returns: type[FrameFlow]: FrameFlow for respective logical type.

Source code in pyjelly/serialize/flows.py
def flow_for_type(logical_type: jelly.LogicalStreamType) -> type[FrameFlow]:
    """
    Return flow based on logical type requested.

    Note: uses base logical type for subtypes (i.e., SUBJECT_GRAPHS uses
        the same flow as its base type GRAPHS).

    Args:
        logical_type (jelly.LogicalStreamType): logical type requested.

    Raises:
        NotImplementedError: if (base) logical stream type is not supported.

    Returns:
        type[FrameFlow]: FrameFlow for respective logical type.

    """
    try:
        base_logical_type_value = logical_type % 10
        base_name = jelly.LogicalStreamType.Name(base_logical_type_value)
        return FLOW_DISPATCH[getattr(jelly.LogicalStreamType, base_name)]
    except KeyError:
        msg = (
            "unsupported logical stream type: "
            f"{jelly.LogicalStreamType.Name(logical_type)}"
        )
        raise NotImplementedError(msg) from None
lookup

Classes:

Name Description
Lookup

Fixed-size 1-based string-to-index mapping with LRU eviction.

LookupEncoder

Shared base for RDF lookup encoders using Jelly compression.

Lookup(max_size)

Fixed-size 1-based string-to-index mapping with LRU eviction.

  • Assigns incrementing indices starting from 1.
  • After reaching the maximum size, reuses the existing indices from evicting the least-recently-used entries.
  • Index 0 is reserved for delta encoding in Jelly streams.

To check if a key exists, use .move(key) and catch KeyError. If KeyError is raised, the key can be inserted with .insert(key).

Parameters:

Name Type Description Default
max_size int

Maximum number of entries. Zero disables lookup.

required
Source code in pyjelly/serialize/lookup.py
def __init__(self, max_size: int) -> None:
    self.data = OrderedDict[str, int]()
    self.max_size = max_size
    self._evicting = False
LookupEncoder(*, lookup_size)

Shared base for RDF lookup encoders using Jelly compression.

Tracks the last assigned and last reused index.

Parameters:

Name Type Description Default
lookup_size int

Maximum lookup size.

required

Methods:

Name Description
encode_entry_index

Get or assign the index to use in an entry.

Source code in pyjelly/serialize/lookup.py
def __init__(self, *, lookup_size: int) -> None:
    self.lookup = Lookup(max_size=lookup_size)
    self.last_assigned_index = 0
    self.last_reused_index = 0
encode_entry_index(key)

Get or assign the index to use in an entry.

Returns:

Type Description
int or None
  • 0 if the new index is sequential (last_assigned_index + 1)
  • actual assigned/reused index otherwise
  • None if the key already exists
If the return value is None, the entry is already in the lookup and does not
need to be emitted. Any integer value (including 0) means the entry is new
and should be emitted.
Source code in pyjelly/serialize/lookup.py
def encode_entry_index(self, key: str) -> int | None:
    """
    Get or assign the index to use in an entry.

    Returns
    -------
    int or None
        - 0 if the new index is sequential (`last_assigned_index + 1`)
        - actual assigned/reused index otherwise
        - None if the key already exists

    If the return value is None, the entry is already in the lookup and does not
    need to be emitted. Any integer value (including 0) means the entry is new
    and should be emitted.

    """
    try:
        self.lookup.make_last_to_evict(key)
        return None  # noqa: TRY300
    except KeyError:
        previous_index = self.last_assigned_index
        index = self.lookup.insert(key)
        self.last_assigned_index = index
        if index == previous_index + 1:
            return 0
        return index
streams

Classes:

Name Description
Stream
TripleStream
QuadStream
GraphStream

Functions:

Name Description
stream_for_type

Give a Stream based on physical type specified.

Stream(*, encoder, options=None)

Methods:

Name Description
infer_flow

Return flow based on the stream options provided.

enroll

Initialize start of the stream.

stream_options

Encode and append stream options row to the current flow.

namespace_declaration

Add namespace declaration to jelly stream.

for_rdflib

Initialize stream with RDFLib encoder.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
infer_flow()

Return flow based on the stream options provided.

Returns: FrameFlow: initialised FrameFlow object.

Source code in pyjelly/serialize/streams.py
def infer_flow(self) -> FrameFlow:
    """
    Return flow based on the stream options provided.

    Returns:
        FrameFlow: initialised FrameFlow object.

    """
    flow: FrameFlow
    if self.options.params.delimited:
        if self.options.logical_type != jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED:
            flow_class = flow_for_type(self.options.logical_type)
        else:
            flow_class = self.default_delimited_flow_class

        if self.options.logical_type in (
            jelly.LOGICAL_STREAM_TYPE_FLAT_TRIPLES,
            jelly.LOGICAL_STREAM_TYPE_FLAT_QUADS,
        ):
            flow = flow_class(
                logical_type=self.options.logical_type,
                frame_size=self.options.frame_size,
            )
        else:
            flow = flow_class(logical_type=self.options.logical_type)
    else:
        flow = ManualFrameFlow(logical_type=self.options.logical_type)
    return flow
enroll()

Initialize start of the stream.

Source code in pyjelly/serialize/streams.py
def enroll(self) -> None:
    """Initialize start of the stream."""
    if not self.enrolled:
        self.stream_options()
        self.enrolled = True
stream_options()

Encode and append stream options row to the current flow.

Source code in pyjelly/serialize/streams.py
def stream_options(self) -> None:
    """Encode and append stream options row to the current flow."""
    self.flow.append(
        encode_options(
            stream_types=self.stream_types,
            params=self.options.params,
            lookup_preset=self.options.lookup_preset,
        )
    )
namespace_declaration(name, iri)

Add namespace declaration to jelly stream.

Args: name (str): namespace prefix label iri (str): namespace iri

Source code in pyjelly/serialize/streams.py
def namespace_declaration(self, name: str, iri: str) -> None:
    """
    Add namespace declaration to jelly stream.

    Args:
        name (str): namespace prefix label
        iri (str): namespace iri

    """
    rows = encode_namespace_declaration(
        name=name,
        value=iri,
        term_encoder=self.encoder,
    )
    self.flow.extend(rows)
for_rdflib(options=None)

Initialize stream with RDFLib encoder.

Args: options (SerializerOptions | None, optional): Stream options. Defaults to None.

Raises: TypeError: if Stream is passed, and not a Stream for specific physical type.

Returns: Stream: initialized stream with RDFLib encoder.

Source code in pyjelly/serialize/streams.py
@classmethod
def for_rdflib(cls, options: SerializerOptions | None = None) -> Stream:
    """
    Initialize stream with RDFLib encoder.

    Args:
        options (SerializerOptions | None, optional): Stream options.
            Defaults to None.

    Raises:
        TypeError: if Stream is passed, and not a Stream for specific physical type.

    Returns:
        Stream: initialized stream with RDFLib encoder.

    """
    if cls is Stream:
        msg = "Stream is an abstract base class, use a subclass instead"
        raise TypeError(msg)
    from pyjelly.integrations.rdflib.serialize import RDFLibTermEncoder

    lookup_preset: LookupPreset | None = None
    if options is not None:
        lookup_preset = options.lookup_preset
    return cls(
        encoder=RDFLibTermEncoder(lookup_preset=lookup_preset),
        options=options,
    )
TripleStream(*, encoder, options=None)

Bases: Stream

Methods:

Name Description
triple

Process one triple to Protobuf messages.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
triple(terms)

Process one triple to Protobuf messages.

Note: Adds new rows to the current flow and returns StreamFrame if frame size conditions are met.

Args: terms (Iterable[object]): RDF terms to encode.

Returns: jelly.RdfStreamFrame | None: stream frame if flow supports frames slicing and current flow is full

Source code in pyjelly/serialize/streams.py
def triple(self, terms: Iterable[object]) -> jelly.RdfStreamFrame | None:
    """
    Process one triple to Protobuf messages.

    Note:
        Adds new rows to the current flow and returns StreamFrame if
        frame size conditions are met.

    Args:
        terms (Iterable[object]): RDF terms to encode.

    Returns:
        jelly.RdfStreamFrame | None: stream frame if
            flow supports frames slicing and current flow is full

    """
    new_rows = encode_triple(
        terms,
        term_encoder=self.encoder,
        repeated_terms=self.repeated_terms,
    )
    self.flow.extend(new_rows)
    return self.flow.frame_from_bounds()
QuadStream(*, encoder, options=None)

Bases: Stream

Methods:

Name Description
quad

Process one quad to Protobuf messages.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
quad(terms)

Process one quad to Protobuf messages.

Args: terms (Iterable[object]): terms to encode.

Returns: jelly.RdfStreamFrame | None: stream frame if flow supports frames slicing and current flow is full

Source code in pyjelly/serialize/streams.py
def quad(self, terms: Iterable[object]) -> jelly.RdfStreamFrame | None:
    """
    Process one quad to Protobuf messages.

    Args:
        terms (Iterable[object]): terms to encode.

    Returns:
        jelly.RdfStreamFrame | None: stream frame if
            flow supports frames slicing and current flow is full

    """
    new_rows = encode_quad(
        terms,
        term_encoder=self.encoder,
        repeated_terms=self.repeated_terms,
    )
    self.flow.extend(new_rows)
    return self.flow.frame_from_bounds()
GraphStream(*, encoder, options=None)

Bases: TripleStream

Methods:

Name Description
graph

Process one graph into a sequence of jelly frames.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
graph(graph_id, graph)

Process one graph into a sequence of jelly frames.

Args: graph_id (object): graph id (BN, Literal, iri, default) graph (Iterable[Iterable[object]]): iterable of triples (graph's content)

Yields: Generator[jelly.RdfStreamFrame]: jelly frames.

Source code in pyjelly/serialize/streams.py
def graph(
    self,
    graph_id: object,
    graph: Iterable[Iterable[object]],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Process one graph into a sequence of jelly frames.

    Args:
        graph_id (object): graph id (BN, Literal, iri, default)
        graph (Iterable[Iterable[object]]): iterable of triples (graph's content)

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames.

    """
    graph_start = jelly.RdfGraphStart()
    [*graph_rows] = self.encoder.encode_graph(graph_id, graph_start)
    start_row = jelly.RdfStreamRow(graph_start=graph_start)
    graph_rows.append(start_row)
    self.flow.extend(graph_rows)
    for triple in graph:
        if frame := self.triple(triple):  # has frame slicing inside
            yield frame
    end_row = jelly.RdfStreamRow(graph_end=jelly.RdfGraphEnd())
    self.flow.append(end_row)
    if frame := self.flow.frame_from_bounds():
        yield frame
stream_for_type(physical_type)

Give a Stream based on physical type specified.

Args: physical_type (jelly.PhysicalStreamType): jelly stream physical type.

Raises: NotImplementedError: if no stream for requested physical type is available.

Returns: type[Stream]: jelly stream

Source code in pyjelly/serialize/streams.py
def stream_for_type(physical_type: jelly.PhysicalStreamType) -> type[Stream]:
    """
    Give a Stream based on physical type specified.

    Args:
        physical_type (jelly.PhysicalStreamType): jelly stream physical type.

    Raises:
        NotImplementedError: if no stream for requested physical type is available.

    Returns:
        type[Stream]: jelly stream

    """
    try:
        stream_cls = STREAM_DISPATCH[physical_type]
    except KeyError:
        msg = (
            "no stream class for physical type "
            f"{jelly.PhysicalStreamType.Name(physical_type)}"
        )
        raise NotImplementedError(msg) from None
    return stream_cls