Skip to content

API reference

pyjelly

Modules:

Name Description
errors
integrations
jelly
options
parse
serialize

errors

Classes:

Name Description
JellyConformanceError

Raised when Jelly conformance is violated.

JellyAssertionError

Raised when a recommended assertion from the specification fails.

JellyNotImplementedError

Raised when a future feature is not yet implemented.

JellyConformanceError

Bases: Exception

Raised when Jelly conformance is violated.

JellyAssertionError

Bases: AssertionError

Raised when a recommended assertion from the specification fails.

JellyNotImplementedError

Bases: NotImplementedError

Raised when a future feature is not yet implemented.

integrations

Modules:

Name Description
generic
rdflib
generic

Modules:

Name Description
generic_sink
parse
serialize
generic_sink

Classes:

Name Description
BlankNode

Class for blank nodes, storing BN's identifier as a string.

IRI

Class for IRIs, storing IRI as a string.

Literal

Class for literals.

Triple

Class for RDF triples.

Quad

Class for RDF quads.

Prefix

Class for generic namespace declaration.

GenericStatementSink
BlankNode(identifier)

Class for blank nodes, storing BN's identifier as a string.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(self, identifier: str) -> None:
    self._identifier: str = identifier
IRI(iri)

Class for IRIs, storing IRI as a string.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(self, iri: str) -> None:
    self._iri: str = iri
Literal(lex, langtag=None, datatype=None)

Class for literals.

Notes: Consists of: lexical form, and optional language tag and datatype. All parts of literal are stored as strings.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(
    self, lex: str, langtag: str | None = None, datatype: str | None = None
) -> None:
    self._lex: str = lex
    self._langtag: str | None = langtag
    self._datatype: str | None = datatype
Triple

Bases: NamedTuple

Class for RDF triples.

Quad

Bases: NamedTuple

Class for RDF quads.

Prefix

Bases: NamedTuple

Class for generic namespace declaration.

GenericStatementSink(identifier=DefaultGraph)

Notes: _store preserves the order of statements.

Args: identifier (str, optional): Identifier for a sink. Defaults to DefaultGraph.

Attributes:

Name Type Description
is_triples_sink bool

Check if the sink contains triples or quads.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(self, identifier: GraphName = DefaultGraph) -> None:
    """
    Initialize statements storage, namespaces dictionary, and parser.

    Notes:
        _store preserves the order of statements.

    Args:
        identifier (str, optional): Identifier for a sink.
            Defaults to DefaultGraph.

    """
    self._store: deque[Triple | Quad] = deque()
    self._namespaces: dict[str, IRI] = {}
    self._identifier = identifier
is_triples_sink

Check if the sink contains triples or quads.

Returns: bool: true, if length of statement is 3.

parse

Classes:

Name Description
GenericStatementSinkAdapter

Implement Adapter for generic statements.

GenericTriplesAdapter

Triples adapted implementation for GenericStatementSink.

GenericQuadsAdapter

Extends GenericQuadsBaseAdapter for QUADS physical type.

GenericGraphsAdapter

Extends GenericQuadsBaseAdapter for GRAPHS physical type.

Functions:

Name Description
parse_triples_stream

Parse flat triple stream.

parse_quads_stream

Parse flat quads stream.

parse_jelly_grouped

Take a jelly file and return generators of generic statements sinks.

parse_jelly_to_graph

Add statements from Generator to GenericStatementSink.

parse_jelly_flat

Parse jelly file with FLAT logical type into a Generator of stream events.

GenericStatementSinkAdapter(*args, **kwargs)

Bases: Adapter

Implement Adapter for generic statements.

Notes: Returns custom RDF terms expected by GenericStatementSink, handles namespace declarations, and quoted triples.

Args: Adapter (type): base Adapter class

GenericTriplesAdapter(options)

Bases: GenericStatementSinkAdapter

Triples adapted implementation for GenericStatementSink.

Args: GenericStatementSinkAdapter (type): base GenericStatementSink adapter implementation that handles terms and namespaces.

Source code in pyjelly/integrations/generic/parse.py
def __init__(
    self,
    options: ParserOptions,
) -> None:
    super().__init__(options=options)
GenericQuadsAdapter(options)

Bases: GenericQuadsBaseAdapter

Extends GenericQuadsBaseAdapter for QUADS physical type.

Args: GenericQuadsBaseAdapter (type): quads adapter that handles base quads processing.

Source code in pyjelly/integrations/generic/parse.py
def __init__(self, options: ParserOptions) -> None:
    super().__init__(options=options)
GenericGraphsAdapter(options)

Bases: GenericQuadsBaseAdapter

Extends GenericQuadsBaseAdapter for GRAPHS physical type.

Notes: introduces graph start/end, checks if graph exists.

Args: GenericQuadsBaseAdapter (type): quads adapter that handles base quads processing.

Raises: JellyConformanceError: raised if graph start message was not received.

Source code in pyjelly/integrations/generic/parse.py
def __init__(
    self,
    options: ParserOptions,
) -> None:
    super().__init__(options=options)
    self._graph_id = None
parse_triples_stream(frames, options, frame_metadata=None)

Parse flat triple stream.

Args: frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames options (ParserOptions): stream options frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable used for extracting frame metadata

Yields: Generator[Iterable[Triple | Prefix]]: Generator of iterables of Triple or Prefix objects, one iterable per frame.

Source code in pyjelly/integrations/generic/parse.py
def parse_triples_stream(
    frames: Iterable[jelly.RdfStreamFrame],
    options: ParserOptions,
    frame_metadata: ContextVar[MutableMapping[str, bytes]] | None = None,
) -> Generator[Iterable[Triple | Prefix]]:
    """
    Parse flat triple stream.

    Args:
        frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames
        options (ParserOptions): stream options
        frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable
                used for extracting frame metadata

    Yields:
        Generator[Iterable[Triple | Prefix]]:
            Generator of iterables of Triple or Prefix objects,
            one iterable per frame.

    """
    adapter = GenericTriplesAdapter(options)
    decoder = Decoder(adapter=adapter)
    for frame in frames:
        if frame_metadata is not None:
            frame_metadata.set(
                frame.metadata
            ) if frame.metadata else frame_metadata.set({})
        yield decoder.iter_rows(frame)
    return
parse_quads_stream(frames, options, frame_metadata=None)

Parse flat quads stream.

Args: frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames options (ParserOptions): stream options frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable used for extracting frame metadata

Yields: Generator[Iterable[Quad | Prefix]]: Generator of iterables of Quad or Prefix objects, one iterable per frame.

Source code in pyjelly/integrations/generic/parse.py
def parse_quads_stream(
    frames: Iterable[jelly.RdfStreamFrame],
    options: ParserOptions,
    frame_metadata: ContextVar[MutableMapping[str, bytes]] | None = None,
) -> Generator[Iterable[Quad | Prefix]]:
    """
    Parse flat quads stream.

    Args:
        frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames
        options (ParserOptions): stream options
        frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable
                used for extracting frame metadata

    Yields:
        Generator[Iterable[Quad | Prefix]]:
            Generator of iterables of Quad or Prefix objects,
            one iterable per frame.

    """
    adapter_class: type[GenericQuadsBaseAdapter]
    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_QUADS:
        adapter_class = GenericQuadsAdapter
    else:
        adapter_class = GenericGraphsAdapter
    adapter = adapter_class(options=options)
    decoder = Decoder(adapter=adapter)
    for frame in frames:
        if frame_metadata is not None:
            frame_metadata.set(
                frame.metadata
            ) if frame.metadata else frame_metadata.set({})
        yield decoder.iter_rows(frame)
    return
parse_jelly_grouped(inp, sink_factory=lambda: GenericStatementSink(), *, logical_type_strict=False, frame_metadata=None)

Take a jelly file and return generators of generic statements sinks.

Yields one generic statements sink per frame.

Args: inp (IO[bytes]): input jelly buffered binary stream sink_factory (Callable): lambda to construct a statement sink. By default, creates an empty in-memory GenericStatementSink. logical_type_strict (bool): If True, validate the logical type in stream options and require a grouped logical type. Otherwise, only the physical type is used to route parsing. frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable used for extracting frame metadata

Raises: NotImplementedError: is raised if a physical type is not implemented

Yields: Generator[GenericStatementSink]: returns generators for GenericStatementSink, regardless of stream type.

Source code in pyjelly/integrations/generic/parse.py
def parse_jelly_grouped(
    inp: IO[bytes],
    sink_factory: Callable[[], GenericStatementSink] = lambda: GenericStatementSink(),
    *,
    logical_type_strict: bool = False,
    frame_metadata: ContextVar[MutableMapping[str, bytes]] | None = None,
) -> Generator[GenericStatementSink]:
    """
    Take a jelly file and return generators of generic statements sinks.

    Yields one generic statements sink per frame.

    Args:
        inp (IO[bytes]): input jelly buffered binary stream
        sink_factory (Callable): lambda to construct a statement sink.
            By default, creates an empty in-memory GenericStatementSink.
        logical_type_strict (bool): If True, validate the *logical* type
            in stream options and require a grouped logical type.
            Otherwise, only the physical type is used to route parsing.
        frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable
                used for extracting frame metadata

    Raises:
        NotImplementedError: is raised if a physical type is not implemented

    Yields:
        Generator[GenericStatementSink]:
            returns generators for GenericStatementSink, regardless of stream type.

    """
    options, frames = get_options_and_frames(inp)

    st = getattr(options, "stream_types", None)
    if logical_type_strict and (
        st is None
        or st.logical_type == jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED
        or st.flat
    ):
        lt_name = (
            "UNSPECIFIED"
            if st is None
            else jelly.LogicalStreamType.Name(st.logical_type)
        )

        msg = (
            "strict logical type check requires options.stream_types"
            if st is None
            else f"expected GROUPED logical type, got {lt_name}"
        )
        raise JellyConformanceError(msg)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        for graph in parse_triples_stream(
            frames=frames,
            options=options,
            **{"frame_metadata": frame_metadata} if frame_metadata is not None else {},
        ):
            sink = sink_factory()
            for graph_item in graph:
                if isinstance(graph_item, Prefix):
                    sink.bind(graph_item.prefix, graph_item.iri)
                else:
                    sink.add(graph_item)
            yield sink
        return
    elif options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        for dataset in parse_quads_stream(
            frames=frames,
            options=options,
            **{"frame_metadata": frame_metadata} if frame_metadata is not None else {},
        ):
            sink = sink_factory()
            for item in dataset:
                if isinstance(item, Prefix):
                    sink.bind(item.prefix, item.iri)
                else:
                    sink.add(item)
            yield sink
        return

    physical_type_name = jelly.PhysicalStreamType.Name(
        options.stream_types.physical_type
    )
    msg = f"the stream type {physical_type_name} is not supported "
    raise NotImplementedError(msg)
parse_jelly_to_graph(inp, sink_factory=lambda: GenericStatementSink())

Add statements from Generator to GenericStatementSink.

Args: inp (IO[bytes]): input jelly stream. sink_factory (Callable[[], GenericStatementSink]): factory to create statement sink. By default creates an empty in-memory GenericStatementSink. Has no division for datasets/graphs, utilizes the same underlying data structures.

Returns: GenericStatementSink: GenericStatementSink with statements.

Source code in pyjelly/integrations/generic/parse.py
def parse_jelly_to_graph(
    inp: IO[bytes],
    sink_factory: Callable[[], GenericStatementSink] = lambda: GenericStatementSink(),
) -> GenericStatementSink:
    """
    Add statements from Generator to GenericStatementSink.

    Args:
        inp (IO[bytes]): input jelly stream.
        sink_factory (Callable[[], GenericStatementSink]): factory to create
            statement sink.
            By default creates an empty in-memory GenericStatementSink.
            Has no division for datasets/graphs,
            utilizes the same underlying data structures.

    Returns:
        GenericStatementSink: GenericStatementSink with statements.

    """
    options, frames = get_options_and_frames(inp)
    sink = sink_factory()

    for item in parse_jelly_flat(
        inp=inp, frames=frames, options=options, logical_type_strict=False
    ):
        if isinstance(item, Prefix):
            sink.bind(item.prefix, item.iri)  # type: ignore[union-attr, unused-ignore]
        else:
            sink.add(item)
    return sink
parse_jelly_flat(inp, frames=None, options=None, *, logical_type_strict=False)

Parse jelly file with FLAT logical type into a Generator of stream events.

Args: inp (IO[bytes]): input jelly buffered binary stream. frames (Iterable[jelly.RdfStreamFrame | None): jelly frames if read before. options (ParserOptions | None): stream options if read before. logical_type_strict (bool): If True, validate the logical type in stream options and require FLAT (TRIPLES/QUADS). Otherwise, only the physical type is used to route parsing.

Raises: NotImplementedError: if physical type is not supported

Yields: Generator[Statement | Prefix]: Generator of stream events

Source code in pyjelly/integrations/generic/parse.py
def parse_jelly_flat(
    inp: IO[bytes],
    frames: Iterable[jelly.RdfStreamFrame] | None = None,
    options: ParserOptions | None = None,
    *,
    logical_type_strict: bool = False,
) -> Generator[Statement | Prefix]:  # type: ignore[valid-type, unused-ignore]
    """
    Parse jelly file with FLAT logical type into a Generator of stream events.

    Args:
        inp (IO[bytes]): input jelly buffered binary stream.
        frames (Iterable[jelly.RdfStreamFrame | None):
            jelly frames if read before.
        options (ParserOptions | None): stream options
            if read before.
        logical_type_strict (bool): If True, validate the *logical* type
            in stream options and require FLAT (TRIPLES/QUADS).
            Otherwise, only the physical type is used to route parsing.

    Raises:
        NotImplementedError: if physical type is not supported

    Yields:
        Generator[Statement | Prefix]: Generator of stream events

    """
    if frames is None or options is None:
        options, frames = get_options_and_frames(inp)

    st = getattr(options, "stream_types", None)
    if logical_type_strict and (st is None or not st.flat):
        lt_name = (
            "UNSPECIFIED"
            if st is None
            else jelly.LogicalStreamType.Name(st.logical_type)
        )

        msg = (
            "strict logical type check requires options.stream_types"
            if st is None
            else f"expected FLAT logical type (TRIPLES/QUADS), got {lt_name}"
        )
        raise JellyConformanceError(msg)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        for triples in parse_triples_stream(frames=frames, options=options):
            yield from triples
        return
    if options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        for quads in parse_quads_stream(frames=frames, options=options):
            yield from quads
        return
    physical_type_name = jelly.PhysicalStreamType.Name(
        options.stream_types.physical_type
    )
    msg = f"the stream type {physical_type_name} is not supported "
    raise NotImplementedError(msg)
serialize

Modules:

Name Description
jelly

Generated protocol buffer code.

Classes:

Name Description
BlankNode

Class for blank nodes, storing BN's identifier as a string.

GenericSinkTermEncoder
GenericStatementSink
GraphStream
IRI

Class for IRIs, storing IRI as a string.

Literal

Class for literals.

Quad

Class for RDF quads.

QuadStream
Slot

mypyc filler docstring

Stream
TermEncoder
Triple

Class for RDF triples.

TripleStream

Functions:

Name Description
stream_frames

Attributes:

Name Type Description
HasGraph

Represent a PEP 604 union type

QUAD_ARITY

int([x]) -> integer

Statement

Represent a PEP 604 union type

__file__

str(object='') -> str

__name__

str(object='') -> str

__package__

str(object='') -> str

HasGraph = rdf_pb2.RdfQuad | rdf_pb2.RdfGraphStart

Represent a PEP 604 union type

E.g. for int | str

QUAD_ARITY = 4

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

int('0b100', base=0) 4

Statement = rdf_pb2.RdfQuad | rdf_pb2.RdfTriple

Represent a PEP 604 union type

E.g. for int | str

__file__ = '/home/runner/work/pyjelly/pyjelly/pyjelly/integrations/generic/serialize.cpython-310-x86_64-linux-gnu.so'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__name__ = 'pyjelly.integrations.generic.serialize'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__package__ = 'pyjelly.integrations.generic'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

BlankNode(identifier)

Class for blank nodes, storing BN's identifier as a string.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(self, identifier: str) -> None:
    self._identifier: str = identifier
GenericSinkTermEncoder(*args, **kwargs)

Bases: pyjelly.serialize.encode.TermEncoder

Methods:

Name Description
__new__

Create and return a new object. See help(type) for accurate signature.

Attributes:

Name Type Description
__doc__

str(object='') -> str

__module__

str(object='') -> str

__mypyc_attrs__

Built-in immutable sequence.

__doc__ = ''

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'pyjelly.integrations.generic.serialize'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__mypyc_attrs__ = ('lookup_preset', 'names', 'prefixes', 'datatypes')

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.

If the argument is a tuple, the return value is the same object.

__new__(*args, **kwargs)

Create and return a new object. See help(type) for accurate signature.

GenericStatementSink(identifier=DefaultGraph)

Notes: _store preserves the order of statements.

Args: identifier (str, optional): Identifier for a sink. Defaults to DefaultGraph.

Attributes:

Name Type Description
is_triples_sink bool

Check if the sink contains triples or quads.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(self, identifier: GraphName = DefaultGraph) -> None:
    """
    Initialize statements storage, namespaces dictionary, and parser.

    Notes:
        _store preserves the order of statements.

    Args:
        identifier (str, optional): Identifier for a sink.
            Defaults to DefaultGraph.

    """
    self._store: deque[Triple | Quad] = deque()
    self._namespaces: dict[str, IRI] = {}
    self._identifier = identifier
is_triples_sink

Check if the sink contains triples or quads.

Returns: bool: true, if length of statement is 3.

GraphStream(*, encoder, options=None)

Bases: TripleStream

Methods:

Name Description
graph

Process one graph into a sequence of jelly frames.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
graph(graph_id, graph)

Process one graph into a sequence of jelly frames.

Args: graph_id (object): graph id (BN, Literal, iri, default) graph (Iterable[Iterable[object]]): iterable of triples (graph's content)

Yields: Generator[jelly.RdfStreamFrame]: jelly frames.

Source code in pyjelly/serialize/streams.py
def graph(
    self,
    graph_id: object,
    graph: Iterable[Iterable[object]],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Process one graph into a sequence of jelly frames.

    Args:
        graph_id (object): graph id (BN, Literal, iri, default)
        graph (Iterable[Iterable[object]]): iterable of triples (graph's content)

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames.

    """
    graph_start = jelly.RdfGraphStart()
    [*graph_rows] = self.encoder.encode_graph(graph_id, graph_start)
    start_row = jelly.RdfStreamRow(graph_start=graph_start)
    graph_rows.append(start_row)
    self.flow.extend(graph_rows)
    for triple in graph:
        if frame := self.triple(triple):  # has frame slicing inside
            yield frame
    end_row = jelly.RdfStreamRow(graph_end=jelly.RdfGraphEnd())
    self.flow.append(end_row)
    if frame := self.flow.frame_from_bounds():
        yield frame
IRI(iri)

Class for IRIs, storing IRI as a string.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(self, iri: str) -> None:
    self._iri: str = iri
Literal(lex, langtag=None, datatype=None)

Class for literals.

Notes: Consists of: lexical form, and optional language tag and datatype. All parts of literal are stored as strings.

Source code in pyjelly/integrations/generic/generic_sink.py
def __init__(
    self, lex: str, langtag: str | None = None, datatype: str | None = None
) -> None:
    self._lex: str = lex
    self._langtag: str | None = langtag
    self._datatype: str | None = datatype
Quad

Bases: NamedTuple

Class for RDF quads.

QuadStream(*, encoder, options=None)

Bases: Stream

Methods:

Name Description
quad

Process one quad to Protobuf messages.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
quad(terms)

Process one quad to Protobuf messages.

Args: terms (Iterable[object]): terms to encode.

Returns: jelly.RdfStreamFrame | None: stream frame if flow supports frames slicing and current flow is full

Source code in pyjelly/serialize/streams.py
def quad(self, terms: Iterable[object]) -> jelly.RdfStreamFrame | None:
    """
    Process one quad to Protobuf messages.

    Args:
        terms (Iterable[object]): terms to encode.

    Returns:
        jelly.RdfStreamFrame | None: stream frame if
            flow supports frames slicing and current flow is full

    """
    new_rows = encode_quad(
        terms,
        term_encoder=self.encoder,
        repeated_terms=self.repeated_terms,
    )
    self.flow.extend(new_rows)
    return self.flow.frame_from_bounds()
Slot

Bases: enum.IntEnum

mypyc filler docstring

Attributes:

Name Type Description
__doc__

str(object='') -> str

__module__

str(object='') -> str

graph

mypyc filler docstring

object

mypyc filler docstring

predicate

mypyc filler docstring

subject

mypyc filler docstring

__doc__ = 'mypyc filler docstring'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'pyjelly.serialize.encode'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

graph = <Slot.graph: 3>

mypyc filler docstring

object = <Slot.object: 2>

mypyc filler docstring

predicate = <Slot.predicate: 1>

mypyc filler docstring

subject = <Slot.subject: 0>

mypyc filler docstring

Stream(*, encoder, options=None)

Methods:

Name Description
infer_flow

Return flow based on the stream options provided.

enroll

Initialize start of the stream.

stream_options

Encode and append stream options row to the current flow.

namespace_declaration

Add namespace declaration to jelly stream.

for_rdflib

Initialize stream with RDFLib encoder.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
infer_flow()

Return flow based on the stream options provided.

Returns: FrameFlow: initialised FrameFlow object.

Source code in pyjelly/serialize/streams.py
def infer_flow(self) -> FrameFlow:
    """
    Return flow based on the stream options provided.

    Returns:
        FrameFlow: initialised FrameFlow object.

    """
    flow: FrameFlow
    if self.options.params.delimited:
        if self.options.logical_type != jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED:
            flow_class = flow_for_type(self.options.logical_type)
        else:
            flow_class = self.default_delimited_flow_class

        if self.options.logical_type in (
            jelly.LOGICAL_STREAM_TYPE_FLAT_TRIPLES,
            jelly.LOGICAL_STREAM_TYPE_FLAT_QUADS,
        ):
            flow = flow_class(
                logical_type=self.options.logical_type,
                frame_size=self.options.frame_size,
            )
        else:
            flow = flow_class(logical_type=self.options.logical_type)
    else:
        flow = ManualFrameFlow(logical_type=self.options.logical_type)
    return flow
enroll()

Initialize start of the stream.

Source code in pyjelly/serialize/streams.py
def enroll(self) -> None:
    """Initialize start of the stream."""
    if not self.enrolled:
        self.stream_options()
        self.enrolled = True
stream_options()

Encode and append stream options row to the current flow.

Source code in pyjelly/serialize/streams.py
def stream_options(self) -> None:
    """Encode and append stream options row to the current flow."""
    self.flow.append(
        encode_options(
            stream_types=self.stream_types,
            params=self.options.params,
            lookup_preset=self.options.lookup_preset,
        )
    )
namespace_declaration(name, iri)

Add namespace declaration to jelly stream.

Args: name (str): namespace prefix label iri (str): namespace iri

Source code in pyjelly/serialize/streams.py
def namespace_declaration(self, name: str, iri: str) -> None:
    """
    Add namespace declaration to jelly stream.

    Args:
        name (str): namespace prefix label
        iri (str): namespace iri

    """
    rows = encode_namespace_declaration(
        name=name,
        value=iri,
        term_encoder=self.encoder,
    )
    self.flow.extend(rows)
for_rdflib(options=None)

Initialize stream with RDFLib encoder.

Args: options (SerializerOptions | None, optional): Stream options. Defaults to None.

Raises: TypeError: if Stream is passed, and not a Stream for specific physical type.

Returns: Stream: initialized stream with RDFLib encoder.

Source code in pyjelly/serialize/streams.py
@classmethod
def for_rdflib(cls, options: SerializerOptions | None = None) -> Stream:
    """
    Initialize stream with RDFLib encoder.

    Args:
        options (SerializerOptions | None, optional): Stream options.
            Defaults to None.

    Raises:
        TypeError: if Stream is passed, and not a Stream for specific physical type.

    Returns:
        Stream: initialized stream with RDFLib encoder.

    """
    if cls is Stream:
        msg = "Stream is an abstract base class, use a subclass instead"
        raise TypeError(msg)
    from pyjelly.integrations.rdflib.serialize import (  # noqa: PLC0415
        RDFLibTermEncoder,
    )

    lookup_preset: LookupPreset | None = None
    if options is not None:
        lookup_preset = options.lookup_preset
    return cls(
        encoder=RDFLibTermEncoder(lookup_preset=lookup_preset),
        options=options,
    )
TermEncoder(*args, **kwargs)

Methods:

Name Description
__new__

Create and return a new object. See help(type) for accurate signature.

Attributes:

Name Type Description
__doc__

str(object='') -> str

__module__

str(object='') -> str

__mypyc_attrs__

Built-in immutable sequence.

__doc__ = ''

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'pyjelly.serialize.encode'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__mypyc_attrs__ = ('lookup_preset', 'names', 'prefixes', 'datatypes')

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.

If the argument is a tuple, the return value is the same object.

__new__(*args, **kwargs)

Create and return a new object. See help(type) for accurate signature.

Triple

Bases: NamedTuple

Class for RDF triples.

TripleStream(*, encoder, options=None)

Bases: Stream

Methods:

Name Description
triple

Process one triple to Protobuf messages.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
triple(terms)

Process one triple to Protobuf messages.

Note: Adds new rows to the current flow and returns StreamFrame if frame size conditions are met.

Args: terms (Iterable[object]): RDF terms to encode.

Returns: jelly.RdfStreamFrame | None: stream frame if flow supports frames slicing and current flow is full

Source code in pyjelly/serialize/streams.py
def triple(self, terms: Iterable[object]) -> jelly.RdfStreamFrame | None:
    """
    Process one triple to Protobuf messages.

    Note:
        Adds new rows to the current flow and returns StreamFrame if
        frame size conditions are met.

    Args:
        terms (Iterable[object]): RDF terms to encode.

    Returns:
        jelly.RdfStreamFrame | None: stream frame if
            flow supports frames slicing and current flow is full

    """
    new_rows = encode_triple(
        terms,
        term_encoder=self.encoder,
        repeated_terms=self.repeated_terms,
    )
    self.flow.extend(new_rows)
    return self.flow.frame_from_bounds()
stream_frames()
jelly

Generated protocol buffer code.

rdflib

Modules:

Name Description
parse
serialize

Functions:

Name Description
register_extension_to_rdflib

Make rdflib.util.guess_format discover Jelly format.

register_extension_to_rdflib(extension='.jelly')

Make rdflib.util.guess_format discover Jelly format.

rdflib.util.guess_format("foo.jelly") register_extension_to_rdflib() rdflib.util.guess_format("foo.jelly") 'jelly'

Source code in pyjelly/integrations/rdflib/__init__.py
def register_extension_to_rdflib(extension: str = ".jelly") -> None:
    """
    Make [rdflib.util.guess_format][] discover Jelly format.

    >>> rdflib.util.guess_format("foo.jelly")
    >>> register_extension_to_rdflib()
    >>> rdflib.util.guess_format("foo.jelly")
    'jelly'
    """
    rdflib.util.SUFFIX_FORMAT_MAP[extension.removeprefix(".")] = "jelly"
parse

Classes:

Name Description
Triple

Describe RDFLib triple.

Quad

Describe RDFLib quad.

Prefix

Describe RDF Prefix(i.e, namespace declaration).

RDFLibAdapter

RDFLib adapter class, is extended by triples and quads implementations.

RDFLibTriplesAdapter

Triples adapter RDFLib implementation.

RDFLibQuadsAdapter

Extended RDFLib adapter for the QUADS physical type.

RDFLibGraphsAdapter

Extension of RDFLibQuadsBaseAdapter for the GRAPHS physical type.

RDFLibJellyParser

Functions:

Name Description
parse_triples_stream

Parse flat triple stream.

parse_quads_stream

Parse flat quads stream.

parse_jelly_grouped

Take jelly file and return generators based on the detected physical type.

parse_jelly_to_graph

Add statements from Generator to provided Graph/Dataset.

parse_jelly_flat

Parse jelly file with FLAT logical type into a Generator of stream events.

Triple

Bases: tuple[Node, Node, Node]

Describe RDFLib triple.

Args: tuple (Node, Node, Node): s/p/o tuple of RDFLib Nodes.

Returns: Triple: triple as tuple.

Quad

Bases: tuple[Node, Node, Node, GraphName]

Describe RDFLib quad.

Args: tuple (Node, Node, Node, GraphName): s/p/o/g as a tuple of RDFLib nodes and a GraphName,

Returns: Quad: quad as tuple.

Prefix

Bases: tuple[str, URIRef]

Describe RDF Prefix(i.e, namespace declaration).

Args: tuple (str, rdflib.URIRef): expects prefix as a string, and full namespace URI as Rdflib.URIRef.

Returns: Prefix: prefix as tuple(prefix, iri).

RDFLibAdapter(*args, **kwargs)

Bases: Adapter

RDFLib adapter class, is extended by triples and quads implementations.

Args: Adapter (): abstract adapter class

RDFLibTriplesAdapter(options)

Bases: RDFLibAdapter

Triples adapter RDFLib implementation.

Notes: returns triple/namespace declaration as soon as receives them.

Source code in pyjelly/integrations/rdflib/parse.py
def __init__(
    self,
    options: ParserOptions,
) -> None:
    super().__init__(options=options)
RDFLibQuadsAdapter(options)

Bases: RDFLibQuadsBaseAdapter

Extended RDFLib adapter for the QUADS physical type.

Args: RDFLibQuadsBaseAdapter (RDFLibAdapter): base quads adapter (shared with graphs physical type)

Source code in pyjelly/integrations/rdflib/parse.py
def __init__(self, options: ParserOptions) -> None:
    super().__init__(options=options)
RDFLibGraphsAdapter(options)

Bases: RDFLibQuadsBaseAdapter

Extension of RDFLibQuadsBaseAdapter for the GRAPHS physical type.

Notes: introduces graph start/end, checks if graph exists.

Args: RDFLibQuadsBaseAdapter (RDFLibAdapter): base adapter for quads management.

Raises: JellyConformanceError: if no graph_start was encountered

Source code in pyjelly/integrations/rdflib/parse.py
def __init__(
    self,
    options: ParserOptions,
) -> None:
    super().__init__(options=options)
    self._graph_id = None
RDFLibJellyParser

Bases: Parser

Methods:

Name Description
parse

Parse jelly file into provided RDFLib Graph.

parse(source, sink)

Parse jelly file into provided RDFLib Graph.

Args: source (InputSource): jelly file as buffered binary stream InputSource obj sink (Graph): RDFLib Graph

Raises: TypeError: raises error if invalid input

Source code in pyjelly/integrations/rdflib/parse.py
def parse(
    self,
    source: InputSource,
    sink: Graph,
) -> None:
    """
    Parse jelly file into provided RDFLib Graph.

    Args:
        source (InputSource): jelly file as buffered binary stream InputSource obj
        sink (Graph): RDFLib Graph

    Raises:
        TypeError: raises error if invalid input

    """
    byte_stream = source.getByteStream()
    if byte_stream is None:
        msg = "expected source to be a stream of bytes"
        raise TypeError(msg)

    inp = cast(IO[bytes], byte_stream)
    if inp is None:
        msg = "expected source to be a stream of bytes"
        raise TypeError(msg)
    parse_jelly_to_graph(
        inp,
        graph_factory=lambda: Graph(store=sink.store, identifier=sink.identifier),
        dataset_factory=lambda: Dataset(store=sink.store),
    )
parse_triples_stream(frames, options, frame_metadata=None)

Parse flat triple stream.

Args: frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames options (ParserOptions): stream options frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable used for extracting frame metadata

Yields: Generator[Iterable[Triple | Prefix]]: Generator of iterables of Triple or Prefix objects, one iterable per frame.

Source code in pyjelly/integrations/rdflib/parse.py
def parse_triples_stream(
    frames: Iterable[jelly.RdfStreamFrame],
    options: ParserOptions,
    frame_metadata: ContextVar[MutableMapping[str, bytes]] | None = None,
) -> Generator[Iterable[Triple | Prefix]]:
    """
    Parse flat triple stream.

    Args:
        frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames
        options (ParserOptions): stream options
        frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable
            used for extracting frame metadata

    Yields:
        Generator[Iterable[Triple | Prefix]]:
            Generator of iterables of Triple or Prefix objects,
            one iterable per frame.

    """
    adapter = RDFLibTriplesAdapter(options)
    decoder = Decoder(adapter=adapter)
    for frame in frames:
        if frame_metadata is not None:
            frame_metadata.set(
                frame.metadata
            ) if frame.metadata else frame_metadata.set({})
        yield decoder.iter_rows(frame)
    return
parse_quads_stream(frames, options, frame_metadata=None)

Parse flat quads stream.

Args: frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames options (ParserOptions): stream options frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable used for extracting frame metadata

Yields: Generator[Iterable[Quad | Prefix]]: Generator of iterables of Quad or Prefix objects, one iterable per frame.

Source code in pyjelly/integrations/rdflib/parse.py
def parse_quads_stream(
    frames: Iterable[jelly.RdfStreamFrame],
    options: ParserOptions,
    frame_metadata: ContextVar[MutableMapping[str, bytes]] | None = None,
) -> Generator[Iterable[Quad | Prefix]]:
    """
    Parse flat quads stream.

    Args:
        frames (Iterable[jelly.RdfStreamFrame]): iterator over stream frames
        options (ParserOptions): stream options
        frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable
            used for extracting frame metadata

    Yields:
        Generator[Iterable[Quad | Prefix]]:
            Generator of iterables of Quad or Prefix objects,
            one iterable per frame.

    """
    adapter_class: type[RDFLibQuadsBaseAdapter]
    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_QUADS:
        adapter_class = RDFLibQuadsAdapter
    else:
        adapter_class = RDFLibGraphsAdapter
    adapter = adapter_class(options=options)
    decoder = Decoder(adapter=adapter)
    for frame in frames:
        if frame_metadata is not None:
            frame_metadata.set(
                frame.metadata
            ) if frame.metadata else frame_metadata.set({})
        yield decoder.iter_rows(frame)
    return
parse_jelly_grouped(inp, graph_factory=lambda: Graph(), dataset_factory=lambda: Dataset(), *, logical_type_strict=False, frame_metadata=None)

Take jelly file and return generators based on the detected physical type.

Yields one graph/dataset per frame.

Args: inp (IO[bytes]): input jelly buffered binary stream graph_factory (Callable): lambda to construct a Graph. By default creates an empty in-memory Graph, but you can pass something else here. dataset_factory (Callable): lambda to construct a Dataset. By default creates an empty in-memory Dataset, but you can pass something else here. logical_type_strict (bool): If True, validate the logical type in stream options and require a grouped logical type. Otherwise, only the physical type is used to route parsing. frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable used for extracting frame metadata

Raises: NotImplementedError: is raised if a physical type is not implemented

Yields: Generator[Graph] | Generator[Dataset]: returns generators for graphs/datasets based on the type of input

Source code in pyjelly/integrations/rdflib/parse.py
def parse_jelly_grouped(
    inp: IO[bytes],
    graph_factory: Callable[[], Graph] = lambda: Graph(),
    dataset_factory: Callable[[], Dataset] = lambda: Dataset(),
    *,
    logical_type_strict: bool = False,
    frame_metadata: ContextVar[MutableMapping[str, bytes]] | None = None,
) -> Generator[Graph] | Generator[Dataset]:
    """
    Take jelly file and return generators based on the detected physical type.

    Yields one graph/dataset per frame.

    Args:
        inp (IO[bytes]): input jelly buffered binary stream
        graph_factory (Callable): lambda to construct a Graph.
            By default creates an empty in-memory Graph,
            but you can pass something else here.
        dataset_factory (Callable): lambda to construct a Dataset.
            By default creates an empty in-memory Dataset,
            but you can pass something else here.
        logical_type_strict (bool): If True, validate the *logical* type in
            stream options and require a grouped logical type. Otherwise, only the
            physical type is used to route parsing.
        frame_metadata: (ContextVar[ScalarMap[str, bytes]]): context variable
            used for extracting frame metadata



    Raises:
        NotImplementedError: is raised if a physical type is not implemented

    Yields:
        Generator[Graph] | Generator[Dataset]:
            returns generators for graphs/datasets based on the type of input

    """
    options, frames = get_options_and_frames(inp)

    st = getattr(options, "stream_types", None)
    if logical_type_strict and (
        st is None
        or st.logical_type == jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED
        or st.flat
    ):
        lt_name = (
            "UNSPECIFIED"
            if st is None
            else jelly.LogicalStreamType.Name(st.logical_type)
        )

        msg = (
            "strict logical type check requires options.stream_types"
            if st is None
            else f"expected GROUPED logical type, got {lt_name}"
        )
        raise JellyConformanceError(msg)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        for graph in parse_triples_stream(
            frames=frames,
            options=options,
            frame_metadata=frame_metadata,
        ):
            sink = graph_factory()
            for graph_item in graph:
                if isinstance(graph_item, Prefix):
                    sink.bind(graph_item.prefix, graph_item.iri)
                else:
                    sink.add(graph_item)
            yield sink
        return
    elif options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        for dataset in parse_quads_stream(
            frames=frames, options=options, frame_metadata=frame_metadata
        ):
            sink = dataset_factory()
            for item in dataset:
                if isinstance(item, Prefix):
                    sink.bind(item.prefix, item.iri)
                else:
                    s, p, o, graph_name = item
                    context = sink.get_context(graph_name)
                    sink.add((s, p, o, context))
            yield sink
        return

    physical_type_name = jelly.PhysicalStreamType.Name(
        options.stream_types.physical_type
    )
    msg = f"the stream type {physical_type_name} is not supported "
    raise NotImplementedError(msg)
parse_jelly_to_graph(inp, graph_factory=lambda: Graph(), dataset_factory=lambda: Dataset())

Add statements from Generator to provided Graph/Dataset.

Args: inp (IO[bytes]): input jelly stream. graph_factory (Callable[[], Graph]): factory to create Graph. By default creates an empty in-memory Graph, but you can pass something else here. dataset_factory (Callable[[], Dataset]): factory to create Dataset. By default creates an empty in-memory Dataset, but you can pass something else here.

Returns: Dataset | Graph: Dataset or Graph with statements.

Source code in pyjelly/integrations/rdflib/parse.py
def parse_jelly_to_graph(
    inp: IO[bytes],
    graph_factory: Callable[[], Graph] = lambda: Graph(),
    dataset_factory: Callable[[], Dataset] = lambda: Dataset(),
) -> Graph | Dataset:
    """
    Add statements from Generator to provided Graph/Dataset.

    Args:
        inp (IO[bytes]): input jelly stream.
        graph_factory (Callable[[], Graph]): factory to create Graph.
            By default creates an empty in-memory Graph,
            but you can pass something else here.
        dataset_factory (Callable[[], Dataset]): factory to create Dataset.
            By default creates an empty in-memory Dataset,
            but you can pass something else here.

    Returns:
        Dataset | Graph: Dataset or Graph with statements.

    """
    options, frames = get_options_and_frames(inp)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        sink = graph_factory()
    if options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        quad_sink = dataset_factory()
        sink = quad_sink

    for item in parse_jelly_flat(inp=inp, frames=frames, options=options):
        if isinstance(item, Prefix):
            sink.bind(item.prefix, item.iri)
        if isinstance(item, Triple):
            sink.add(item)
        if isinstance(item, Quad):
            s, p, o, graph_name = item
            context = quad_sink.get_context(graph_name)
            quad_sink.add((s, p, o, context))
    return sink
parse_jelly_flat(inp, frames=None, options=None, *, logical_type_strict=False)

Parse jelly file with FLAT logical type into a Generator of stream events.

Args: inp (IO[bytes]): input jelly buffered binary stream. frames (Iterable[jelly.RdfStreamFrame | None): jelly frames if read before. options (ParserOptions | None): stream options if read before. logical_type_strict (bool): If True, validate the logical type in stream options and require FLAT_(TRIPLES|QUADS). Otherwise, only the physical type is used to route parsing.

Raises: NotImplementedError: if physical type is not supported

Yields: Generator[Statement | Prefix]: Generator of stream events

Source code in pyjelly/integrations/rdflib/parse.py
def parse_jelly_flat(
    inp: IO[bytes],
    frames: Iterable[jelly.RdfStreamFrame] | None = None,
    options: ParserOptions | None = None,
    *,
    logical_type_strict: bool = False,
) -> Generator[Statement | Prefix]:
    """
    Parse jelly file with FLAT logical type into a Generator of stream events.

    Args:
        inp (IO[bytes]): input jelly buffered binary stream.
        frames (Iterable[jelly.RdfStreamFrame | None):
            jelly frames if read before.
        options (ParserOptions | None): stream options
            if read before.
        logical_type_strict (bool): If True, validate the *logical* type in
            stream options and require FLAT_(TRIPLES|QUADS). Otherwise, only the
            physical type is used to route parsing.

    Raises:
        NotImplementedError: if physical type is not supported

    Yields:
        Generator[Statement | Prefix]: Generator of stream events

    """
    if frames is None or options is None:
        options, frames = get_options_and_frames(inp)

    st = getattr(options, "stream_types", None)
    if logical_type_strict and (st is None or not st.flat):
        lt_name = (
            "UNSPECIFIED"
            if st is None
            else jelly.LogicalStreamType.Name(st.logical_type)
        )
        msg = (
            "strict logical type check requires options.stream_types"
            if st is None
            else f"expected FLAT logical type (TRIPLES/QUADS), got {lt_name}"
        )
        raise JellyConformanceError(msg)

    if options.stream_types.physical_type == jelly.PHYSICAL_STREAM_TYPE_TRIPLES:
        for triples in parse_triples_stream(frames=frames, options=options):
            yield from triples
        return
    if options.stream_types.physical_type in (
        jelly.PHYSICAL_STREAM_TYPE_QUADS,
        jelly.PHYSICAL_STREAM_TYPE_GRAPHS,
    ):
        for quads in parse_quads_stream(frames=frames, options=options):
            yield from quads
        return
    physical_type_name = jelly.PhysicalStreamType.Name(
        options.stream_types.physical_type
    )
    msg = f"the stream type {physical_type_name} is not supported "
    raise NotImplementedError(msg)
serialize

Classes:

Name Description
RDFLibTermEncoder
RDFLibJellySerializer

RDFLib serializer for writing graphs in Jelly RDF stream format.

Functions:

Name Description
triples_stream_frames

Serialize a Graph/Dataset into jelly frames.

quads_stream_frames

Serialize a Dataset into jelly frames.

graphs_stream_frames

Serialize a Dataset into jelly frames as a stream of graphs.

guess_options

Guess the serializer options based on the store type.

guess_stream

Return an appropriate stream implementation for the given options.

grouped_stream_to_frames

Transform Graphs/Datasets into Jelly frames, one frame per Graph/Dataset.

grouped_stream_to_file

Write stream of Graphs/Datasets to a binary file.

flat_stream_to_frames

Serialize a stream of raw triples or quads into Jelly frames.

flat_stream_to_file

Write Triple or Quad events to a binary file in Jelly flat format.

RDFLibTermEncoder(*args, **kwargs)

Bases: TermEncoder

Methods:

Name Description
encode_spo

Encode s/p/o term based on its RDFLib object.

encode_graph

Encode graph name term based on its RDFLib object.

encode_spo(term, slot, statement)

Encode s/p/o term based on its RDFLib object.

Args: term (object): term to encode slot (Slot): its place in statement. statement (Statement): Triple/Quad message to fill with s/p/o terms.

Returns: Rows: encoded extra rows

Source code in pyjelly/integrations/rdflib/serialize.py
def encode_spo(self, term: object, slot: Slot, statement: Statement) -> Rows:
    """
    Encode s/p/o term based on its RDFLib object.

    Args:
        term (object): term to encode
        slot (Slot): its place in statement.
        statement (Statement): Triple/Quad message to fill with s/p/o terms.

    Returns:
        Rows: encoded extra rows

    """
    if isinstance(term, rdflib.URIRef):
        iri = self.get_iri_field(statement, slot)
        return self.encode_iri(term, iri)

    if isinstance(term, rdflib.Literal):
        literal = self.get_literal_field(statement, slot)
        return self.encode_literal(
            lex=str(term),
            language=term.language,
            # `datatype` is cast to `str` explicitly because
            # `URIRef.__eq__` overrides `str.__eq__` in an incompatible manner
            datatype=term.datatype and str(term.datatype),
            literal=literal,
        )

    if isinstance(term, rdflib.BNode):
        self.set_bnode_field(statement, slot, str(term))
        return ()

    return super().encode_spo(term, slot, statement)  # error if not handled
encode_graph(term, statement)

Encode graph name term based on its RDFLib object.

Args: term (object): term to encode statement (HasGraph): Quad/GraphStart message to fill g_{} in.

Returns: Rows: encoded extra rows

Source code in pyjelly/integrations/rdflib/serialize.py
def encode_graph(self, term: object, statement: HasGraph) -> Rows:
    """
    Encode graph name term based on its RDFLib object.

    Args:
        term (object): term to encode
        statement (HasGraph): Quad/GraphStart message to fill g_{} in.

    Returns:
        Rows: encoded extra rows

    """
    if term == DATASET_DEFAULT_GRAPH_ID:
        return self.encode_default_graph(statement.g_default_graph)

    if isinstance(term, rdflib.URIRef):
        return self.encode_iri(term, statement.g_iri)

    if isinstance(term, rdflib.BNode):
        statement.g_bnode = str(term)
        return ()
    return super().encode_graph(term, statement)  # error if not handled
RDFLibJellySerializer(store)

Bases: Serializer

RDFLib serializer for writing graphs in Jelly RDF stream format.

Handles streaming RDF terms into Jelly frames using internal encoders. Supports only graphs and datasets (not quoted graphs).

Methods:

Name Description
serialize

Serialize self.store content to Jelly format.

Source code in pyjelly/integrations/rdflib/serialize.py
def __init__(self, store: Graph) -> None:
    if isinstance(store, QuotedGraph):
        msg = "N3 format is not supported"
        raise NotImplementedError(msg)
    super().__init__(store)
serialize(out, /, *, stream=None, options=None, **unused)

Serialize self.store content to Jelly format.

Args: out (IO[bytes]): output buffered writer stream (Stream | None, optional): Jelly stream object. Defaults to None. options (SerializerOptions | None, optional): Serializer options if defined beforehand, e.g., read from a separate file. Defaults to None. **unused(Any): unused args for RDFLib serialize

Source code in pyjelly/integrations/rdflib/serialize.py
@override
def serialize(  # type: ignore[override]
    self,
    out: IO[bytes],
    /,
    *,
    stream: Stream | None = None,
    options: SerializerOptions | None = None,
    **unused: Any,
) -> None:
    """
    Serialize self.store content to Jelly format.

    Args:
        out (IO[bytes]): output buffered writer
        stream (Stream | None, optional): Jelly stream object. Defaults to None.
        options (SerializerOptions | None, optional): Serializer options
            if defined beforehand, e.g., read from a separate file.
            Defaults to None.
        **unused(Any): unused args for RDFLib serialize

    """
    if options is None:
        options = guess_options(self.store)
    if stream is None:
        stream = guess_stream(options, self.store)
    write = write_delimited if stream.options.params.delimited else write_single
    for stream_frame in stream_frames(stream, self.store):
        write(stream_frame, out)
triples_stream_frames(stream, data)

Serialize a Graph/Dataset into jelly frames.

Args: stream (TripleStream): stream that specifies triples processing data (Graph | Dataset | Generator[Triple]): Graph/Dataset/Statements to serialize.

Notes: if Dataset is given, its graphs are unpacked and iterated over if flow is GraphsFrameFlow, emits a frame per graph.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames.

Source code in pyjelly/integrations/rdflib/serialize.py
@stream_frames.register(TripleStream)
def triples_stream_frames(
    stream: TripleStream,
    data: Graph | Dataset | Generator[Triple],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a Graph/Dataset into jelly frames.

    Args:
        stream (TripleStream): stream that specifies triples processing
        data (Graph | Dataset | Generator[Triple]):
            Graph/Dataset/Statements to serialize.

    Notes:
        if Dataset is given, its graphs are unpacked and iterated over
        if flow is GraphsFrameFlow, emits a frame per graph.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames.

    """
    stream.enroll()
    if isinstance(data, Graph) and stream.options.params.namespace_declarations:
        namespace_declarations(data, stream)

    graphs = (data,) if not isinstance(data, Dataset) else data.graphs()
    for graph in graphs:
        for terms in graph:
            if frame := stream.triple(terms):
                yield frame
        if frame := stream.flow.frame_from_graph():
            yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame
quads_stream_frames(stream, data)

Serialize a Dataset into jelly frames.

Notes: Emits one frame per dataset if flow is of DatasetsFrameFlow.

Args: stream (QuadStream): stream that specifies quads processing data (Dataset | Generator[Quad]): Dataset to serialize.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames

Source code in pyjelly/integrations/rdflib/serialize.py
@stream_frames.register(QuadStream)
def quads_stream_frames(
    stream: QuadStream,
    data: Dataset | Generator[Quad],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a Dataset into jelly frames.

    Notes:
        Emits one frame per dataset if flow is of DatasetsFrameFlow.

    Args:
        stream (QuadStream): stream that specifies quads processing
        data (Dataset | Generator[Quad]): Dataset to serialize.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames

    """
    stream.enroll()
    if stream.options.params.namespace_declarations:
        namespace_declarations(data, stream)  # type: ignore[arg-type]
    iterator: Generator[Quad, None, None]
    if isinstance(data, Dataset):
        iterator = cast(Generator[Quad, None, None], data.quads())
    else:
        iterator = data

    for terms in iterator:
        if frame := stream.quad(terms):
            yield frame
    if frame := stream.flow.frame_from_dataset():
        yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame
graphs_stream_frames(stream, data)

Serialize a Dataset into jelly frames as a stream of graphs.

Notes: If flow of DatasetsFrameFlow type, the whole dataset will be encoded into one frame.

Args: stream (GraphStream): stream that specifies graphs processing data (Dataset | Generator[Quad]): Dataset to serialize.

Yields: Generator[jelly.RdfStreamFrame]: jelly frames

Source code in pyjelly/integrations/rdflib/serialize.py
@stream_frames.register(GraphStream)
def graphs_stream_frames(
    stream: GraphStream,
    data: Dataset | Generator[Quad],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a Dataset into jelly frames as a stream of graphs.

    Notes:
        If flow of DatasetsFrameFlow type, the whole dataset
        will be encoded into one frame.

    Args:
        stream (GraphStream): stream that specifies graphs processing
        data (Dataset | Generator[Quad]): Dataset to serialize.

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames

    """
    stream.enroll()
    if stream.options.params.namespace_declarations:
        namespace_declarations(data, stream)  # type: ignore[arg-type]

    if isinstance(data, Dataset):
        graphs = data.graphs()
    else:
        ds = Dataset()
        for quad in data:
            ctx = ds.get_context(quad.g)
            ctx.add((quad.s, quad.p, quad.o))
        graphs = ds.graphs()

    for graph in graphs:
        yield from stream.graph(graph_id=graph.identifier, graph=graph)

    if frame := stream.flow.frame_from_dataset():
        yield frame
    if stream.stream_types.flat and (frame := stream.flow.to_stream_frame()):
        yield frame
guess_options(sink)

Guess the serializer options based on the store type.

guess_options(Graph()).logical_type 1 guess_options(Dataset()).logical_type 2

Source code in pyjelly/integrations/rdflib/serialize.py
def guess_options(sink: Graph | Dataset) -> SerializerOptions:
    """
    Guess the serializer options based on the store type.

    >>> guess_options(Graph()).logical_type
    1
    >>> guess_options(Dataset()).logical_type
    2
    """
    logical_type = (
        jelly.LOGICAL_STREAM_TYPE_FLAT_QUADS
        if isinstance(sink, Dataset)
        else jelly.LOGICAL_STREAM_TYPE_FLAT_TRIPLES
    )
    # RDFLib doesn't support RDF-star and generalized statements by default
    # as it requires specific handling for quoted triples and non-standard RDF terms
    params = StreamParameters(generalized_statements=False, rdf_star=False)
    return SerializerOptions(logical_type=logical_type, params=params)
guess_stream(options, sink)

Return an appropriate stream implementation for the given options.

Notes: if base(!) logical type is GRAPHS and Dataset is given, initializes TripleStream

graph_ser = RDFLibJellySerializer(Graph()) ds_ser = RDFLibJellySerializer(Dataset())

type(guess_stream(guess_options(graph_ser.store), graph_ser.store)) type(guess_stream(guess_options(ds_ser.store), ds_ser.store))

Source code in pyjelly/integrations/rdflib/serialize.py
def guess_stream(options: SerializerOptions, sink: Graph | Dataset) -> Stream:
    """
    Return an appropriate stream implementation for the given options.

    Notes: if base(!) logical type is GRAPHS and Dataset is given,
        initializes TripleStream

    >>> graph_ser = RDFLibJellySerializer(Graph())
    >>> ds_ser = RDFLibJellySerializer(Dataset())

    >>> type(guess_stream(guess_options(graph_ser.store), graph_ser.store))
    <class 'pyjelly.serialize.streams.TripleStream'>
    >>> type(guess_stream(guess_options(ds_ser.store), ds_ser.store))
    <class 'pyjelly.serialize.streams.QuadStream'>
    """
    stream_cls: type[Stream]
    if (options.logical_type % 10) != jelly.LOGICAL_STREAM_TYPE_GRAPHS and isinstance(
        sink, Dataset
    ):
        stream_cls = QuadStream
    else:
        stream_cls = TripleStream
    return stream_cls.for_rdflib(options=options)
grouped_stream_to_frames(sink_generator, options=None)

Transform Graphs/Datasets into Jelly frames, one frame per Graph/Dataset.

Note: options are guessed if not provided.

Args: sink_generator (Generator[Graph] | Generator[Dataset]): Generator of Graphs/Dataset to transform. options (SerializerOptions | None, optional): stream options to use. Options are guessed based on the sink store type. Defaults to None.

Yields: Generator[jelly.RdfStreamFrame]: produced Jelly frames

Source code in pyjelly/integrations/rdflib/serialize.py
def grouped_stream_to_frames(
    sink_generator: Generator[Graph] | Generator[Dataset],
    options: SerializerOptions | None = None,
) -> Generator[jelly.RdfStreamFrame]:
    """
    Transform Graphs/Datasets into Jelly frames, one frame per Graph/Dataset.

    Note: options are guessed if not provided.

    Args:
        sink_generator (Generator[Graph] | Generator[Dataset]): Generator of
            Graphs/Dataset to transform.
        options (SerializerOptions | None, optional): stream options to use.
            Options are guessed based on the sink store type. Defaults to None.

    Yields:
        Generator[jelly.RdfStreamFrame]: produced Jelly frames

    """
    stream = None
    for sink in sink_generator:
        if not stream:
            if options is None:
                options = guess_options(sink)
            stream = guess_stream(options, sink)
        yield from stream_frames(stream, sink)
grouped_stream_to_file(stream, output_file, **kwargs)

Write stream of Graphs/Datasets to a binary file.

Args: stream (Generator[Graph] | Generator[Dataset]): Generator of Graphs/Dataset to transform. output_file (IO[bytes]): output buffered writer. **kwargs (Any): options to pass to stream.

Source code in pyjelly/integrations/rdflib/serialize.py
def grouped_stream_to_file(
    stream: Generator[Graph] | Generator[Dataset],
    output_file: IO[bytes],
    **kwargs: Any,
) -> None:
    """
    Write stream of Graphs/Datasets to a binary file.

    Args:
        stream (Generator[Graph] | Generator[Dataset]): Generator of
            Graphs/Dataset to transform.
        output_file (IO[bytes]): output buffered writer.
        **kwargs (Any): options to pass to stream.

    """
    for frame in grouped_stream_to_frames(stream, **kwargs):
        write_delimited(frame, output_file)
flat_stream_to_frames(statements, options=None)

Serialize a stream of raw triples or quads into Jelly frames.

Args: statements (Generator[Triple | Quad]): s/p/o triples or s/p/o/g quads to serialize. options (SerializerOptions | None, optional): if omitted, guessed based on the first tuple.

Yields: Generator[jelly.RdfStreamFrame]: generated frames.

Source code in pyjelly/integrations/rdflib/serialize.py
def flat_stream_to_frames(
    statements: Generator[Triple | Quad],
    options: SerializerOptions | None = None,
) -> Generator[jelly.RdfStreamFrame]:
    """
    Serialize a stream of raw triples or quads into Jelly frames.

    Args:
        statements (Generator[Triple | Quad]):
          s/p/o triples or s/p/o/g quads to serialize.
        options (SerializerOptions | None, optional):
            if omitted, guessed based on the first tuple.

    Yields:
        Generator[jelly.RdfStreamFrame]: generated frames.

    """
    first = next(statements, None)
    if first is None:
        return

    sink = Dataset() if len(first) == QUAD_ARITY else Graph()
    if options is None:
        options = guess_options(sink)
    stream = guess_stream(options, sink)

    combined: Generator[Triple | Quad] | Graph = (
        item for item in chain([first], statements)
    )

    yield from stream_frames(stream, combined)
flat_stream_to_file(statements, output_file, options=None)

Write Triple or Quad events to a binary file in Jelly flat format.

Args: statements (Generator[Triple | Quad]): statements to serialize. output_file (IO[bytes]): output buffered writer. options (SerializerOptions | None, optional): stream options.

Source code in pyjelly/integrations/rdflib/serialize.py
def flat_stream_to_file(
    statements: Generator[Triple | Quad],
    output_file: IO[bytes],
    options: SerializerOptions | None = None,
) -> None:
    """
    Write Triple or Quad events to a binary file in Jelly flat format.

    Args:
        statements (Generator[Triple | Quad]): statements to serialize.
        output_file (IO[bytes]): output buffered writer.
        options (SerializerOptions | None, optional): stream options.

    """
    for frame in flat_stream_to_frames(statements, options):
        write_delimited(frame, output_file)

jelly

Modules:

Name Description
rdf_pb2

Generated protocol buffer code.

rdf_pb2

Generated protocol buffer code.

options

Classes:

Name Description
StreamTypes

Functions:

Name Description
register_mimetypes

Associate files that have Jelly extension with Jelly MIME types.

Attributes:

Name Type Description
INTEGRATION_SIDE_EFFECTS bool

Whether to allow integration module imports to trigger side effects.

INTEGRATION_SIDE_EFFECTS = True

Whether to allow integration module imports to trigger side effects.

These side effects are cheap and may include populating some registries for guessing the defaults for external integrations that work with Jelly.

StreamTypes(physical_type=jelly.PHYSICAL_STREAM_TYPE_UNSPECIFIED, logical_type=jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED)

Methods:

Name Description
__repr__

Return the representation of StreamTypes.

__repr__()

Return the representation of StreamTypes.

repr(StreamTypes(9999, 8888)) 'StreamTypes(9999, 8888)'

Source code in pyjelly/options.py
def __repr__(self) -> str:
    """
    Return the representation of StreamTypes.

    >>> repr(StreamTypes(9999, 8888))
    'StreamTypes(9999, 8888)'
    """
    with suppress(ValueError):
        physical_type_name = jelly.PhysicalStreamType.Name(self.physical_type)
        logical_type_name = jelly.LogicalStreamType.Name(self.logical_type)
        return f"StreamTypes({physical_type_name}, {logical_type_name})"
    return f"StreamTypes({self.physical_type}, {self.logical_type})"
register_mimetypes(extension='.jelly')

Associate files that have Jelly extension with Jelly MIME types.

register_mimetypes() mimetypes.guess_type("out.jelly") ('application/x-jelly-rdf', None)

Source code in pyjelly/options.py
def register_mimetypes(extension: str = ".jelly") -> None:
    """
    Associate files that have Jelly extension with Jelly MIME types.

    >>> register_mimetypes()
    >>> mimetypes.guess_type("out.jelly")
    ('application/x-jelly-rdf', None)
    """
    for mimetype in MIMETYPES:
        mimetypes.add_type(mimetype, extension)

parse

Modules:

Name Description
decode
ioutils
lookup
decode

Modules:

Name Description
jelly

Generated protocol buffer code.

Classes:

Name Description
Adapter
Decoder
LookupDecoder

Shared base for RDF lookup encoders using Jelly compression.

ParserOptions

mypyc filler docstring

ParsingMode

mypyc filler docstring

StreamTypes

Attributes:

Name Type Description
MAX_VERSION

int([x]) -> integer

__file__

str(object='') -> str

__name__

str(object='') -> str

__package__

str(object='') -> str

MAX_VERSION = 2

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

int('0b100', base=0) 4

__file__ = '/home/runner/work/pyjelly/pyjelly/pyjelly/parse/decode.cpython-310-x86_64-linux-gnu.so'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__name__ = 'pyjelly.parse.decode'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__package__ = 'pyjelly.parse'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Adapter(*args, **kwargs)

Methods:

Name Description
__new__

Create and return a new object. See help(type) for accurate signature.

Attributes:

Name Type Description
__module__

str(object='') -> str

__mypyc_attrs__

Built-in immutable sequence.

__module__ = 'pyjelly.parse.decode'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__mypyc_attrs__ = ('options', 'parsing_mode')

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.

If the argument is a tuple, the return value is the same object.

__new__(*args, **kwargs)

Create and return a new object. See help(type) for accurate signature.

Decoder(*args, **kwargs)

Methods:

Name Description
__new__

Create and return a new object. See help(type) for accurate signature.

Attributes:

Name Type Description
__doc__

str(object='') -> str

__module__

str(object='') -> str

__mypyc_attrs__

Built-in immutable sequence.

__doc__ = ''

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'pyjelly.parse.decode'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__mypyc_attrs__ = ('adapter', 'names', 'prefixes', 'datatypes', 'repeated_terms', 'row_handlers', 'term_handlers')

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.

If the argument is a tuple, the return value is the same object.

__new__(*args, **kwargs)

Create and return a new object. See help(type) for accurate signature.

LookupDecoder(*, lookup_size)

Shared base for RDF lookup encoders using Jelly compression.

Tracks the last assigned and last reused index.

Parameters:

Name Type Description Default
lookup_size int

Maximum lookup size.

required
Source code in pyjelly/parse/lookup.py
def __init__(self, *, lookup_size: int) -> None:
    if lookup_size > MAX_LOOKUP_SIZE:
        msg = f"lookup size cannot be larger than {MAX_LOOKUP_SIZE}"
        raise JellyAssertionError(msg)
    self.lookup_size = lookup_size
    placeholders = (None,) * lookup_size
    self.data: deque[str | None] = deque(placeholders, maxlen=lookup_size)
    self.last_assigned_index = 0
    self.last_reused_index = 0
ParserOptions

Bases: builtins.tuple

mypyc filler docstring

Attributes:

Name Type Description
__annotations__

dict() -> new empty dictionary

__doc__

str(object='') -> str

__match_args__

Built-in immutable sequence.

__module__

str(object='') -> str

__slots__

Built-in immutable sequence.

lookup_preset

Alias for field number 1

params

Alias for field number 2

stream_types

Alias for field number 0

__annotations__ = {'stream_types': <class 'pyjelly.options.StreamTypes'>, 'lookup_preset': <class 'pyjelly.options.LookupPreset'>, 'params': <class 'pyjelly.options.StreamParameters'>}

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

__doc__ = 'mypyc filler docstring'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__match_args__ = ('stream_types', 'lookup_preset', 'params')

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.

If the argument is a tuple, the return value is the same object.

__module__ = 'pyjelly.parse.decode'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__slots__ = ()

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.

If the argument is a tuple, the return value is the same object.

lookup_preset = _tuplegetter(1, 'Alias for field number 1')

Alias for field number 1

params = _tuplegetter(2, 'Alias for field number 2')

Alias for field number 2

stream_types = _tuplegetter(0, 'Alias for field number 0')

Alias for field number 0

ParsingMode

Bases: enum.Enum

mypyc filler docstring

Attributes:

Name Type Description
FLAT

mypyc filler docstring

GROUPED

mypyc filler docstring

__doc__

str(object='') -> str

__module__

str(object='') -> str

FLAT = <ParsingMode.FLAT: 1>

mypyc filler docstring

GROUPED = <ParsingMode.GROUPED: 2>

mypyc filler docstring

__doc__ = 'mypyc filler docstring'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'pyjelly.parse.decode'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

StreamTypes(physical_type=jelly.PHYSICAL_STREAM_TYPE_UNSPECIFIED, logical_type=jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED)

Methods:

Name Description
__repr__

Return the representation of StreamTypes.

__repr__()

Return the representation of StreamTypes.

repr(StreamTypes(9999, 8888)) 'StreamTypes(9999, 8888)'

Source code in pyjelly/options.py
def __repr__(self) -> str:
    """
    Return the representation of StreamTypes.

    >>> repr(StreamTypes(9999, 8888))
    'StreamTypes(9999, 8888)'
    """
    with suppress(ValueError):
        physical_type_name = jelly.PhysicalStreamType.Name(self.physical_type)
        logical_type_name = jelly.LogicalStreamType.Name(self.logical_type)
        return f"StreamTypes({physical_type_name}, {logical_type_name})"
    return f"StreamTypes({self.physical_type}, {self.logical_type})"
jelly

Generated protocol buffer code.

ioutils

Functions:

Name Description
delimited_jelly_hint

Detect whether a Jelly file is delimited from its first 3 bytes.

get_options_and_frames

Return stream options and frames from the buffered binary stream.

delimited_jelly_hint(header)

Detect whether a Jelly file is delimited from its first 3 bytes.

Truth table (notation: 0A = 0x0A, NN = not 0x0A, ?? = don't care):

Byte 1 Byte 2 Byte 3 Result
NN ?? ?? Delimited
0A NN ?? Non-delimited
0A 0A NN Delimited (size = 10)
0A 0A 0A Non-delimited (stream options size = 10)

delimited_jelly_hint(bytes([0x00, 0x00, 0x00])) True

delimited_jelly_hint(bytes([0x00, 0x00, 0x0A])) True

delimited_jelly_hint(bytes([0x00, 0x0A, 0x00])) True

delimited_jelly_hint(bytes([0x00, 0x0A, 0x0A])) True

delimited_jelly_hint(bytes([0x0A, 0x00, 0x00])) False

delimited_jelly_hint(bytes([0x0A, 0x00, 0x0A])) False

delimited_jelly_hint(bytes([0x0A, 0x0A, 0x00])) True

delimited_jelly_hint(bytes([0x0A, 0x0A, 0x0A])) False

Source code in pyjelly/parse/ioutils.py
def delimited_jelly_hint(header: bytes) -> bool:
    """
    Detect whether a Jelly file is delimited from its first 3 bytes.

    Truth table (notation: `0A` = `0x0A`, `NN` = `not 0x0A`, `??` = _don't care_):

    | Byte 1 | Byte 2 | Byte 3 | Result                                   |
    |--------|--------|--------|------------------------------------------|
    | `NN`   |  `??`  |  `??`  | Delimited                                |
    | `0A`   |  `NN`  |  `??`  | Non-delimited                            |
    | `0A`   |  `0A`  |  `NN`  | Delimited (size = 10)                    |
    | `0A`   |  `0A`  |  `0A`  | Non-delimited (stream options size = 10) |

    >>> delimited_jelly_hint(bytes([0x00, 0x00, 0x00]))
    True

    >>> delimited_jelly_hint(bytes([0x00, 0x00, 0x0A]))
    True

    >>> delimited_jelly_hint(bytes([0x00, 0x0A, 0x00]))
    True

    >>> delimited_jelly_hint(bytes([0x00, 0x0A, 0x0A]))
    True

    >>> delimited_jelly_hint(bytes([0x0A, 0x00, 0x00]))
    False

    >>> delimited_jelly_hint(bytes([0x0A, 0x00, 0x0A]))
    False

    >>> delimited_jelly_hint(bytes([0x0A, 0x0A, 0x00]))
    True

    >>> delimited_jelly_hint(bytes([0x0A, 0x0A, 0x0A]))
    False
    """
    magic = 0x0A
    return len(header) >= 3 and (  # noqa: PLR2004
        header[0] != magic or (header[1] == magic and header[2] != magic)
    )
get_options_and_frames(inp)

Return stream options and frames from the buffered binary stream.

Args: inp (IO[bytes]): jelly buffered binary stream

Raises: JellyConformanceError: if no non-empty frames detected in the delimited stream JellyConformanceError: if non-delimited, error is raised if no rows are detected (empty frame)

Returns: tuple[ParserOptions, Iterator[jelly.RdfStreamFrame]]: ParserOptions holds: stream types, lookup presets and other stream options

Source code in pyjelly/parse/ioutils.py
def get_options_and_frames(
    inp: IO[bytes],
) -> tuple[ParserOptions, Iterator[jelly.RdfStreamFrame]]:
    """
    Return stream options and frames from the buffered binary stream.

    Args:
        inp (IO[bytes]): jelly buffered binary stream

    Raises:
        JellyConformanceError: if no non-empty frames detected in the delimited stream
        JellyConformanceError: if non-delimited,
            error is raised if no rows are detected (empty frame)

    Returns:
        tuple[ParserOptions, Iterator[jelly.RdfStreamFrame]]: ParserOptions holds:
            stream types, lookup presets and other stream options

    """
    if not inp.seekable():
        # Input may not be seekable (e.g. a network stream) -- then we need to buffer
        # it to determine if it's delimited.
        # See also: https://github.com/Jelly-RDF/pyjelly/issues/298
        inp = io.BufferedReader(inp)  # type: ignore[arg-type, type-var, unused-ignore]
        is_delimited = delimited_jelly_hint(inp.peek(3))
    else:
        is_delimited = delimited_jelly_hint(bytes_read := inp.read(3))
        inp.seek(-len(bytes_read), os.SEEK_CUR)

    if is_delimited:
        first_frame = None
        skipped_frames = []
        frames = frame_iterator(inp)
        for frame in frames:
            if not frame.rows:
                skipped_frames.append(frame)
            else:
                first_frame = frame
                break
        if first_frame is None:
            msg = "No non-empty frames found in the stream"
            raise JellyConformanceError(msg)

        options = options_from_frame(first_frame, delimited=True)
        return options, chain(skipped_frames, (first_frame,), frames)

    frame = parse(jelly.RdfStreamFrame, inp.read())

    if not frame.rows:
        msg = "The stream is corrupted (only contains an empty frame)"
        raise JellyConformanceError(msg)

    options = options_from_frame(frame, delimited=False)
    return options, iter((frame,))
lookup

Classes:

Name Description
LookupDecoder

Shared base for RDF lookup encoders using Jelly compression.

LookupDecoder(*, lookup_size)

Shared base for RDF lookup encoders using Jelly compression.

Tracks the last assigned and last reused index.

Parameters:

Name Type Description Default
lookup_size int

Maximum lookup size.

required
Source code in pyjelly/parse/lookup.py
def __init__(self, *, lookup_size: int) -> None:
    if lookup_size > MAX_LOOKUP_SIZE:
        msg = f"lookup size cannot be larger than {MAX_LOOKUP_SIZE}"
        raise JellyAssertionError(msg)
    self.lookup_size = lookup_size
    placeholders = (None,) * lookup_size
    self.data: deque[str | None] = deque(placeholders, maxlen=lookup_size)
    self.last_assigned_index = 0
    self.last_reused_index = 0

serialize

Modules:

Name Description
encode
flows
lookup
streams
encode

Modules:

Name Description
jelly

Generated protocol buffer code.

options

Classes:

Name Description
JellyConformanceError

Raised when Jelly conformance is violated.

LookupEncoder

Shared base for RDF lookup encoders using Jelly compression.

Slot

mypyc filler docstring

TermEncoder

Attributes:

Name Type Description
HasGraph

Represent a PEP 604 union type

Statement

Represent a PEP 604 union type

T

Type variable.

Terms

Represent a PEP 604 union type

__file__

str(object='') -> str

__name__

str(object='') -> str

__package__

str(object='') -> str

HasGraph = rdf_pb2.RdfQuad | rdf_pb2.RdfGraphStart

Represent a PEP 604 union type

E.g. for int | str

Statement = rdf_pb2.RdfQuad | rdf_pb2.RdfTriple

Represent a PEP 604 union type

E.g. for int | str

T = ~T

Type variable.

Usage::

T = TypeVar('T') # Can be anything A = TypeVar('A', str, bytes) # Must be str or bytes

Type variables exist primarily for the benefit of static type checkers. They serve as the parameters for generic types as well as for generic function definitions. See class Generic for more information on generic types. Generic functions work as follows:

def repeat(x: T, n: int) -> List[T]: '''Return a list containing n references to x.''' return [x]*n

def longest(x: A, y: A) -> A: '''Return the longest of two strings.''' return x if len(x) >= len(y) else y

The latter example's signature is essentially the overloading of (str, str) -> str and (bytes, bytes) -> bytes. Also note that if the arguments are instances of some subclass of str, the return type is still plain str.

At runtime, isinstance(x, T) and issubclass(C, T) will raise TypeError.

Type variables defined with covariant=True or contravariant=True can be used to declare covariant or contravariant generic types. See PEP 484 for more details. By default generic types are invariant in all type variables.

Type variables can be introspected. e.g.:

T.name == 'T' T.constraints == () T.covariant == False T.contravariant = False A.constraints == (str, bytes)

Note that only type variables defined in global scope can be pickled.

Terms = rdf_pb2.RdfIri | rdf_pb2.RdfLiteral | str | rdf_pb2.RdfDefaultGraph | rdf_pb2.RdfTriple

Represent a PEP 604 union type

E.g. for int | str

__file__ = '/home/runner/work/pyjelly/pyjelly/pyjelly/serialize/encode.cpython-310-x86_64-linux-gnu.so'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__name__ = 'pyjelly.serialize.encode'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__package__ = 'pyjelly.serialize'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

JellyConformanceError

Bases: Exception

Raised when Jelly conformance is violated.

LookupEncoder(*, lookup_size)

Shared base for RDF lookup encoders using Jelly compression.

Tracks the last assigned and last reused index.

Parameters:

Name Type Description Default
lookup_size int

Maximum lookup size.

required

Methods:

Name Description
encode_entry_index

Get or assign the index to use in an entry.

Source code in pyjelly/serialize/lookup.py
def __init__(self, *, lookup_size: int) -> None:
    self.lookup = Lookup(max_size=lookup_size)
    self.last_assigned_index = 0
    self.last_reused_index = 0
encode_entry_index(key)

Get or assign the index to use in an entry.

Returns:

Type Description
int or None
  • 0 if the new index is sequential (last_assigned_index + 1)
  • actual assigned/reused index otherwise
  • None if the key already exists
If the return value is None, the entry is already in the lookup and does not
need to be emitted. Any integer value (including 0) means the entry is new
and should be emitted.
Source code in pyjelly/serialize/lookup.py
def encode_entry_index(self, key: str) -> int | None:
    """
    Get or assign the index to use in an entry.

    Returns
    -------
    int or None
        - 0 if the new index is sequential (`last_assigned_index + 1`)
        - actual assigned/reused index otherwise
        - None if the key already exists

    If the return value is None, the entry is already in the lookup and does not
    need to be emitted. Any integer value (including 0) means the entry is new
    and should be emitted.

    """
    try:
        self.lookup.make_last_to_evict(key)
        return None  # noqa: TRY300
    except KeyError:
        previous_index = self.last_assigned_index
        index = self.lookup.insert(key)
        self.last_assigned_index = index
        if index == previous_index + 1:
            return 0
        return index
Slot

Bases: enum.IntEnum

mypyc filler docstring

Attributes:

Name Type Description
__doc__

str(object='') -> str

__module__

str(object='') -> str

graph

mypyc filler docstring

object

mypyc filler docstring

predicate

mypyc filler docstring

subject

mypyc filler docstring

__doc__ = 'mypyc filler docstring'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'pyjelly.serialize.encode'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

graph = <Slot.graph: 3>

mypyc filler docstring

object = <Slot.object: 2>

mypyc filler docstring

predicate = <Slot.predicate: 1>

mypyc filler docstring

subject = <Slot.subject: 0>

mypyc filler docstring

TermEncoder(*args, **kwargs)

Methods:

Name Description
__new__

Create and return a new object. See help(type) for accurate signature.

Attributes:

Name Type Description
__doc__

str(object='') -> str

__module__

str(object='') -> str

__mypyc_attrs__

Built-in immutable sequence.

__doc__ = ''

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__module__ = 'pyjelly.serialize.encode'

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__mypyc_attrs__ = ('lookup_preset', 'names', 'prefixes', 'datatypes')

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.

If the argument is a tuple, the return value is the same object.

__new__(*args, **kwargs)

Create and return a new object. See help(type) for accurate signature.

jelly

Generated protocol buffer code.

options

Classes:

Name Description
StreamTypes

Functions:

Name Description
register_mimetypes

Associate files that have Jelly extension with Jelly MIME types.

Attributes:

Name Type Description
INTEGRATION_SIDE_EFFECTS bool

Whether to allow integration module imports to trigger side effects.

INTEGRATION_SIDE_EFFECTS = True

Whether to allow integration module imports to trigger side effects.

These side effects are cheap and may include populating some registries for guessing the defaults for external integrations that work with Jelly.

StreamTypes(physical_type=jelly.PHYSICAL_STREAM_TYPE_UNSPECIFIED, logical_type=jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED)

Methods:

Name Description
__repr__

Return the representation of StreamTypes.

__repr__()

Return the representation of StreamTypes.

repr(StreamTypes(9999, 8888)) 'StreamTypes(9999, 8888)'

Source code in pyjelly/options.py
def __repr__(self) -> str:
    """
    Return the representation of StreamTypes.

    >>> repr(StreamTypes(9999, 8888))
    'StreamTypes(9999, 8888)'
    """
    with suppress(ValueError):
        physical_type_name = jelly.PhysicalStreamType.Name(self.physical_type)
        logical_type_name = jelly.LogicalStreamType.Name(self.logical_type)
        return f"StreamTypes({physical_type_name}, {logical_type_name})"
    return f"StreamTypes({self.physical_type}, {self.logical_type})"
register_mimetypes(extension='.jelly')

Associate files that have Jelly extension with Jelly MIME types.

register_mimetypes() mimetypes.guess_type("out.jelly") ('application/x-jelly-rdf', None)

Source code in pyjelly/options.py
def register_mimetypes(extension: str = ".jelly") -> None:
    """
    Associate files that have Jelly extension with Jelly MIME types.

    >>> register_mimetypes()
    >>> mimetypes.guess_type("out.jelly")
    ('application/x-jelly-rdf', None)
    """
    for mimetype in MIMETYPES:
        mimetypes.add_type(mimetype, extension)
flows

Classes:

Name Description
FrameFlow

Abstract base class for producing Jelly frames from RDF stream rows.

ManualFrameFlow

Produces frames only when manually requested (never automatically).

BoundedFrameFlow

Produce frames automatically when a fixed number of rows is reached.

GraphsFrameFlow
DatasetsFrameFlow

Functions:

Name Description
flow_for_type

Return flow based on logical type requested.

FrameFlow(initlist=None, *, logical_type=None, **__kwargs)

Bases: UserList[RdfStreamRow]

Abstract base class for producing Jelly frames from RDF stream rows.

Collects stream rows and assembles them into RdfStreamFrame objects when ready.

Allows for passing LogicalStreamType, required for logical subtypes and non-delimited streams.

Methods:

Name Description
frame_from_graph

Treat the current rows as a graph and produce a frame.

frame_from_dataset

Treat the current rows as a dataset and produce a frame.

to_stream_frame

Create stream frame from flow content.

Source code in pyjelly/serialize/flows.py
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    *,
    logical_type: jelly.LogicalStreamType | None = None,
    **__kwargs: Any,
) -> None:
    super().__init__(initlist)
    self.logical_type = logical_type or self.__class__.logical_type
frame_from_graph()

Treat the current rows as a graph and produce a frame.

Default implementation returns None.

Source code in pyjelly/serialize/flows.py
def frame_from_graph(self) -> jelly.RdfStreamFrame | None:
    """
    Treat the current rows as a graph and produce a frame.

    Default implementation returns None.
    """
    return None
frame_from_dataset()

Treat the current rows as a dataset and produce a frame.

Default implementation returns None.

Source code in pyjelly/serialize/flows.py
def frame_from_dataset(self) -> jelly.RdfStreamFrame | None:
    """
    Treat the current rows as a dataset and produce a frame.

    Default implementation returns None.
    """
    return None
to_stream_frame()

Create stream frame from flow content.

Notes: Clears flow content after creating the frame.

Returns: jelly.RdfStreamFrame | None: stream frame

Source code in pyjelly/serialize/flows.py
def to_stream_frame(self) -> jelly.RdfStreamFrame | None:
    """
    Create stream frame from flow content.

    Notes:
        Clears flow content after creating the frame.

    Returns:
        jelly.RdfStreamFrame | None: stream frame

    """
    if not self:
        return None
    frame = jelly.RdfStreamFrame(rows=self)
    self.clear()
    return frame
ManualFrameFlow(initlist=None, *, logical_type=None, **__kwargs)

Bases: FrameFlow

Produces frames only when manually requested (never automatically).

Warning

All stream rows are kept in memory until to_stream_frame() is called. This may lead to high memory usage for large streams.

Used for non-delimited serialization.

Source code in pyjelly/serialize/flows.py
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    *,
    logical_type: jelly.LogicalStreamType | None = None,
    **__kwargs: Any,
) -> None:
    super().__init__(initlist)
    self.logical_type = logical_type or self.__class__.logical_type
BoundedFrameFlow(initlist=None, logical_type=None, *, frame_size=None)

Bases: FrameFlow

Produce frames automatically when a fixed number of rows is reached.

Used for delimited encoding (default mode).

Methods:

Name Description
frame_from_bounds

Emit frame from flow if full.

Source code in pyjelly/serialize/flows.py
@override
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    logical_type: jelly.LogicalStreamType | None = None,
    *,
    frame_size: int | None = None,
) -> None:
    super().__init__(initlist, logical_type=logical_type)
    self.frame_size = frame_size or DEFAULT_FRAME_SIZE
frame_from_bounds()

Emit frame from flow if full.

Returns: jelly.RdfStreamFrame | None: stream frame

Source code in pyjelly/serialize/flows.py
@override
def frame_from_bounds(self) -> jelly.RdfStreamFrame | None:
    """
    Emit frame from flow if full.

    Returns:
        jelly.RdfStreamFrame | None: stream frame

    """
    if len(self) >= self.frame_size:
        return self.to_stream_frame()
    return None
GraphsFrameFlow(initlist=None, *, logical_type=None, **__kwargs)

Bases: FrameFlow

Methods:

Name Description
frame_from_graph

Emit current flow content (one graph) as jelly frame.

Source code in pyjelly/serialize/flows.py
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    *,
    logical_type: jelly.LogicalStreamType | None = None,
    **__kwargs: Any,
) -> None:
    super().__init__(initlist)
    self.logical_type = logical_type or self.__class__.logical_type
frame_from_graph()

Emit current flow content (one graph) as jelly frame.

Returns: jelly.RdfStreamFrame | None: jelly frame or none if flow is empty.

Source code in pyjelly/serialize/flows.py
def frame_from_graph(self) -> jelly.RdfStreamFrame | None:
    """
    Emit current flow content (one graph) as jelly frame.

    Returns:
        jelly.RdfStreamFrame | None: jelly frame or none if
            flow is empty.

    """
    return self.to_stream_frame()
DatasetsFrameFlow(initlist=None, *, logical_type=None, **__kwargs)

Bases: FrameFlow

Methods:

Name Description
frame_from_dataset

Emit current flow content (dataset) as jelly frame.

Source code in pyjelly/serialize/flows.py
def __init__(
    self,
    initlist: Iterable[jelly.RdfStreamRow] | None = None,
    *,
    logical_type: jelly.LogicalStreamType | None = None,
    **__kwargs: Any,
) -> None:
    super().__init__(initlist)
    self.logical_type = logical_type or self.__class__.logical_type
frame_from_dataset()

Emit current flow content (dataset) as jelly frame.

Returns: jelly.RdfStreamFrame | None: jelly frame or none if flow is empty.

Source code in pyjelly/serialize/flows.py
def frame_from_dataset(self) -> jelly.RdfStreamFrame | None:
    """
    Emit current flow content (dataset) as jelly frame.

    Returns:
        jelly.RdfStreamFrame | None: jelly frame or none if
            flow is empty.

    """
    return self.to_stream_frame()
flow_for_type(logical_type)

Return flow based on logical type requested.

Note: uses base logical type for subtypes (i.e., SUBJECT_GRAPHS uses the same flow as its base type GRAPHS).

Args: logical_type (jelly.LogicalStreamType): logical type requested.

Raises: NotImplementedError: if (base) logical stream type is not supported.

Returns: type[FrameFlow]: FrameFlow for respective logical type.

Source code in pyjelly/serialize/flows.py
def flow_for_type(logical_type: jelly.LogicalStreamType) -> type[FrameFlow]:
    """
    Return flow based on logical type requested.

    Note: uses base logical type for subtypes (i.e., SUBJECT_GRAPHS uses
        the same flow as its base type GRAPHS).

    Args:
        logical_type (jelly.LogicalStreamType): logical type requested.

    Raises:
        NotImplementedError: if (base) logical stream type is not supported.

    Returns:
        type[FrameFlow]: FrameFlow for respective logical type.

    """
    try:
        base_logical_type_value = logical_type % 10
        base_name = jelly.LogicalStreamType.Name(base_logical_type_value)
        return FLOW_DISPATCH[getattr(jelly.LogicalStreamType, base_name)]
    except KeyError:
        msg = (
            "unsupported logical stream type: "
            f"{jelly.LogicalStreamType.Name(logical_type)}"
        )
        raise NotImplementedError(msg) from None
lookup

Classes:

Name Description
Lookup

Fixed-size 1-based string-to-index mapping with LRU eviction.

LookupEncoder

Shared base for RDF lookup encoders using Jelly compression.

Lookup(max_size)

Fixed-size 1-based string-to-index mapping with LRU eviction.

  • Assigns incrementing indices starting from 1.
  • After reaching the maximum size, reuses the existing indices from evicting the least-recently-used entries.
  • Index 0 is reserved for delta encoding in Jelly streams.

To check if a key exists, use .move(key) and catch KeyError. If KeyError is raised, the key can be inserted with .insert(key).

Parameters:

Name Type Description Default
max_size int

Maximum number of entries. Zero disables lookup.

required
Source code in pyjelly/serialize/lookup.py
def __init__(self, max_size: int) -> None:
    self.data = OrderedDict[str, int]()
    self.max_size = max_size
    self._evicting = False
LookupEncoder(*, lookup_size)

Shared base for RDF lookup encoders using Jelly compression.

Tracks the last assigned and last reused index.

Parameters:

Name Type Description Default
lookup_size int

Maximum lookup size.

required

Methods:

Name Description
encode_entry_index

Get or assign the index to use in an entry.

Source code in pyjelly/serialize/lookup.py
def __init__(self, *, lookup_size: int) -> None:
    self.lookup = Lookup(max_size=lookup_size)
    self.last_assigned_index = 0
    self.last_reused_index = 0
encode_entry_index(key)

Get or assign the index to use in an entry.

Returns:

Type Description
int or None
  • 0 if the new index is sequential (last_assigned_index + 1)
  • actual assigned/reused index otherwise
  • None if the key already exists
If the return value is None, the entry is already in the lookup and does not
need to be emitted. Any integer value (including 0) means the entry is new
and should be emitted.
Source code in pyjelly/serialize/lookup.py
def encode_entry_index(self, key: str) -> int | None:
    """
    Get or assign the index to use in an entry.

    Returns
    -------
    int or None
        - 0 if the new index is sequential (`last_assigned_index + 1`)
        - actual assigned/reused index otherwise
        - None if the key already exists

    If the return value is None, the entry is already in the lookup and does not
    need to be emitted. Any integer value (including 0) means the entry is new
    and should be emitted.

    """
    try:
        self.lookup.make_last_to_evict(key)
        return None  # noqa: TRY300
    except KeyError:
        previous_index = self.last_assigned_index
        index = self.lookup.insert(key)
        self.last_assigned_index = index
        if index == previous_index + 1:
            return 0
        return index
streams

Classes:

Name Description
Stream
TripleStream
QuadStream
GraphStream

Functions:

Name Description
stream_for_type

Give a Stream based on physical type specified.

Stream(*, encoder, options=None)

Methods:

Name Description
infer_flow

Return flow based on the stream options provided.

enroll

Initialize start of the stream.

stream_options

Encode and append stream options row to the current flow.

namespace_declaration

Add namespace declaration to jelly stream.

for_rdflib

Initialize stream with RDFLib encoder.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
infer_flow()

Return flow based on the stream options provided.

Returns: FrameFlow: initialised FrameFlow object.

Source code in pyjelly/serialize/streams.py
def infer_flow(self) -> FrameFlow:
    """
    Return flow based on the stream options provided.

    Returns:
        FrameFlow: initialised FrameFlow object.

    """
    flow: FrameFlow
    if self.options.params.delimited:
        if self.options.logical_type != jelly.LOGICAL_STREAM_TYPE_UNSPECIFIED:
            flow_class = flow_for_type(self.options.logical_type)
        else:
            flow_class = self.default_delimited_flow_class

        if self.options.logical_type in (
            jelly.LOGICAL_STREAM_TYPE_FLAT_TRIPLES,
            jelly.LOGICAL_STREAM_TYPE_FLAT_QUADS,
        ):
            flow = flow_class(
                logical_type=self.options.logical_type,
                frame_size=self.options.frame_size,
            )
        else:
            flow = flow_class(logical_type=self.options.logical_type)
    else:
        flow = ManualFrameFlow(logical_type=self.options.logical_type)
    return flow
enroll()

Initialize start of the stream.

Source code in pyjelly/serialize/streams.py
def enroll(self) -> None:
    """Initialize start of the stream."""
    if not self.enrolled:
        self.stream_options()
        self.enrolled = True
stream_options()

Encode and append stream options row to the current flow.

Source code in pyjelly/serialize/streams.py
def stream_options(self) -> None:
    """Encode and append stream options row to the current flow."""
    self.flow.append(
        encode_options(
            stream_types=self.stream_types,
            params=self.options.params,
            lookup_preset=self.options.lookup_preset,
        )
    )
namespace_declaration(name, iri)

Add namespace declaration to jelly stream.

Args: name (str): namespace prefix label iri (str): namespace iri

Source code in pyjelly/serialize/streams.py
def namespace_declaration(self, name: str, iri: str) -> None:
    """
    Add namespace declaration to jelly stream.

    Args:
        name (str): namespace prefix label
        iri (str): namespace iri

    """
    rows = encode_namespace_declaration(
        name=name,
        value=iri,
        term_encoder=self.encoder,
    )
    self.flow.extend(rows)
for_rdflib(options=None)

Initialize stream with RDFLib encoder.

Args: options (SerializerOptions | None, optional): Stream options. Defaults to None.

Raises: TypeError: if Stream is passed, and not a Stream for specific physical type.

Returns: Stream: initialized stream with RDFLib encoder.

Source code in pyjelly/serialize/streams.py
@classmethod
def for_rdflib(cls, options: SerializerOptions | None = None) -> Stream:
    """
    Initialize stream with RDFLib encoder.

    Args:
        options (SerializerOptions | None, optional): Stream options.
            Defaults to None.

    Raises:
        TypeError: if Stream is passed, and not a Stream for specific physical type.

    Returns:
        Stream: initialized stream with RDFLib encoder.

    """
    if cls is Stream:
        msg = "Stream is an abstract base class, use a subclass instead"
        raise TypeError(msg)
    from pyjelly.integrations.rdflib.serialize import (  # noqa: PLC0415
        RDFLibTermEncoder,
    )

    lookup_preset: LookupPreset | None = None
    if options is not None:
        lookup_preset = options.lookup_preset
    return cls(
        encoder=RDFLibTermEncoder(lookup_preset=lookup_preset),
        options=options,
    )
TripleStream(*, encoder, options=None)

Bases: Stream

Methods:

Name Description
triple

Process one triple to Protobuf messages.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
triple(terms)

Process one triple to Protobuf messages.

Note: Adds new rows to the current flow and returns StreamFrame if frame size conditions are met.

Args: terms (Iterable[object]): RDF terms to encode.

Returns: jelly.RdfStreamFrame | None: stream frame if flow supports frames slicing and current flow is full

Source code in pyjelly/serialize/streams.py
def triple(self, terms: Iterable[object]) -> jelly.RdfStreamFrame | None:
    """
    Process one triple to Protobuf messages.

    Note:
        Adds new rows to the current flow and returns StreamFrame if
        frame size conditions are met.

    Args:
        terms (Iterable[object]): RDF terms to encode.

    Returns:
        jelly.RdfStreamFrame | None: stream frame if
            flow supports frames slicing and current flow is full

    """
    new_rows = encode_triple(
        terms,
        term_encoder=self.encoder,
        repeated_terms=self.repeated_terms,
    )
    self.flow.extend(new_rows)
    return self.flow.frame_from_bounds()
QuadStream(*, encoder, options=None)

Bases: Stream

Methods:

Name Description
quad

Process one quad to Protobuf messages.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
quad(terms)

Process one quad to Protobuf messages.

Args: terms (Iterable[object]): terms to encode.

Returns: jelly.RdfStreamFrame | None: stream frame if flow supports frames slicing and current flow is full

Source code in pyjelly/serialize/streams.py
def quad(self, terms: Iterable[object]) -> jelly.RdfStreamFrame | None:
    """
    Process one quad to Protobuf messages.

    Args:
        terms (Iterable[object]): terms to encode.

    Returns:
        jelly.RdfStreamFrame | None: stream frame if
            flow supports frames slicing and current flow is full

    """
    new_rows = encode_quad(
        terms,
        term_encoder=self.encoder,
        repeated_terms=self.repeated_terms,
    )
    self.flow.extend(new_rows)
    return self.flow.frame_from_bounds()
GraphStream(*, encoder, options=None)

Bases: TripleStream

Methods:

Name Description
graph

Process one graph into a sequence of jelly frames.

Source code in pyjelly/serialize/streams.py
def __init__(
    self,
    *,
    encoder: TermEncoder,
    options: SerializerOptions | None = None,
) -> None:
    self.encoder = encoder
    if options is None:
        options = SerializerOptions()
    self.options = options
    flow = options.flow
    if flow is None:
        flow = self.infer_flow()
    self.flow = flow
    self.repeated_terms = [None] * len(Slot)
    self.enrolled = False
    self.stream_types = StreamTypes(
        physical_type=self.physical_type,
        logical_type=self.flow.logical_type,
    )
graph(graph_id, graph)

Process one graph into a sequence of jelly frames.

Args: graph_id (object): graph id (BN, Literal, iri, default) graph (Iterable[Iterable[object]]): iterable of triples (graph's content)

Yields: Generator[jelly.RdfStreamFrame]: jelly frames.

Source code in pyjelly/serialize/streams.py
def graph(
    self,
    graph_id: object,
    graph: Iterable[Iterable[object]],
) -> Generator[jelly.RdfStreamFrame]:
    """
    Process one graph into a sequence of jelly frames.

    Args:
        graph_id (object): graph id (BN, Literal, iri, default)
        graph (Iterable[Iterable[object]]): iterable of triples (graph's content)

    Yields:
        Generator[jelly.RdfStreamFrame]: jelly frames.

    """
    graph_start = jelly.RdfGraphStart()
    [*graph_rows] = self.encoder.encode_graph(graph_id, graph_start)
    start_row = jelly.RdfStreamRow(graph_start=graph_start)
    graph_rows.append(start_row)
    self.flow.extend(graph_rows)
    for triple in graph:
        if frame := self.triple(triple):  # has frame slicing inside
            yield frame
    end_row = jelly.RdfStreamRow(graph_end=jelly.RdfGraphEnd())
    self.flow.append(end_row)
    if frame := self.flow.frame_from_bounds():
        yield frame
stream_for_type(physical_type)

Give a Stream based on physical type specified.

Args: physical_type (jelly.PhysicalStreamType): jelly stream physical type.

Raises: NotImplementedError: if no stream for requested physical type is available.

Returns: type[Stream]: jelly stream

Source code in pyjelly/serialize/streams.py
def stream_for_type(physical_type: jelly.PhysicalStreamType) -> type[Stream]:
    """
    Give a Stream based on physical type specified.

    Args:
        physical_type (jelly.PhysicalStreamType): jelly stream physical type.

    Raises:
        NotImplementedError: if no stream for requested physical type is available.

    Returns:
        type[Stream]: jelly stream

    """
    try:
        stream_cls = STREAM_DISPATCH[physical_type]
    except KeyError:
        msg = (
            "no stream class for physical type "
            f"{jelly.PhysicalStreamType.Name(physical_type)}"
        )
        raise NotImplementedError(msg) from None
    return stream_cls