Skip to content

Elasticsearch

The elasticsearch adapter provides integration with Elasticsearch for full-text search, document indexing, and analytics queries.

Ports

Abstract port interface defining the Elasticsearch adapter contract.

archipy.adapters.elasticsearch.ports.ElasticsearchResponseType module-attribute

ElasticsearchResponseType = Awaitable[Any] | Any

archipy.adapters.elasticsearch.ports.ElasticsearchDocumentType module-attribute

ElasticsearchDocumentType = dict[str, Any]

archipy.adapters.elasticsearch.ports.ElasticsearchQueryType module-attribute

ElasticsearchQueryType = dict[str, Any]

archipy.adapters.elasticsearch.ports.ElasticsearchIndexType module-attribute

ElasticsearchIndexType = str

archipy.adapters.elasticsearch.ports.ElasticsearchIdType module-attribute

ElasticsearchIdType = str

archipy.adapters.elasticsearch.ports.ElasticsearchPort

Interface for Elasticsearch operations providing a standardized access pattern.

This interface defines the contract for Elasticsearch adapters, ensuring consistent implementation of Elasticsearch operations across different adapters. It covers all essential Elasticsearch functionality including document operations, search, and index management.

Source code in archipy/adapters/elasticsearch/ports.py
class ElasticsearchPort:
    """Interface for Elasticsearch operations providing a standardized access pattern.

    This interface defines the contract for Elasticsearch adapters, ensuring consistent
    implementation of Elasticsearch operations across different adapters. It covers all
    essential Elasticsearch functionality including document operations, search, and
    index management.
    """

    @abstractmethod
    def ping(self) -> ElasticsearchResponseType:
        """Tests the connection to the Elasticsearch server.

        Returns:
            ElasticsearchResponseType: The response from the server.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def index(
        self,
        index: ElasticsearchIndexType,
        document: ElasticsearchDocumentType,
        doc_id: ElasticsearchIdType | None = None,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Index a document in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            document (ElasticsearchDocumentType): The document to index.
            doc_id (ElasticsearchIdType | None): Optional document ID. If not provided, Elasticsearch will generate one.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def get(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Get a document from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The document if found.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def search(
        self,
        index: ElasticsearchIndexType,
        query: ElasticsearchQueryType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Search for documents in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            query (ElasticsearchQueryType): The search query.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The search results.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def update(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        doc: ElasticsearchDocumentType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Update a document in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            doc (ElasticsearchDocumentType): The document update.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def delete(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Delete a document from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def bulk(
        self,
        actions: list[dict[str, Any]],
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Perform bulk operations in Elasticsearch.

        Args:
            actions (list[dict[str, Any]]): List of bulk actions to perform.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def create_index(
        self,
        index: ElasticsearchIndexType,
        body: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Create an index in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            body (dict[str, Any] | None): Optional index settings and mappings.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def delete_index(
        self,
        index: ElasticsearchIndexType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Delete an index from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def exists(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Check if a document exists in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: True if the document exists, False otherwise.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    def index_exists(
        self,
        index: ElasticsearchIndexType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Check if an index exists in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: True if the index exists, False otherwise.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

archipy.adapters.elasticsearch.ports.ElasticsearchPort.ping abstractmethod

ping() -> ElasticsearchResponseType

Tests the connection to the Elasticsearch server.

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from the server.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
def ping(self) -> ElasticsearchResponseType:
    """Tests the connection to the Elasticsearch server.

    Returns:
        ElasticsearchResponseType: The response from the server.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.ElasticsearchPort.index abstractmethod

index(
    index: ElasticsearchIndexType,
    document: ElasticsearchDocumentType,
    doc_id: ElasticsearchIdType | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType

Index a document in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
document ElasticsearchDocumentType

The document to index.

required
doc_id ElasticsearchIdType | None

Optional document ID. If not provided, Elasticsearch will generate one.

None
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
def index(
    self,
    index: ElasticsearchIndexType,
    document: ElasticsearchDocumentType,
    doc_id: ElasticsearchIdType | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Index a document in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        document (ElasticsearchDocumentType): The document to index.
        doc_id (ElasticsearchIdType | None): Optional document ID. If not provided, Elasticsearch will generate one.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.ElasticsearchPort.get abstractmethod

get(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Get a document from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The document if found.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
def get(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Get a document from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The document if found.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.ElasticsearchPort.search abstractmethod

search(
    index: ElasticsearchIndexType,
    query: ElasticsearchQueryType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Search for documents in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
query ElasticsearchQueryType

The search query.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The search results.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
def search(
    self,
    index: ElasticsearchIndexType,
    query: ElasticsearchQueryType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Search for documents in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        query (ElasticsearchQueryType): The search query.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The search results.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.ElasticsearchPort.update abstractmethod

update(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    doc: ElasticsearchDocumentType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Update a document in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
doc ElasticsearchDocumentType

The document update.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
def update(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    doc: ElasticsearchDocumentType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Update a document in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        doc (ElasticsearchDocumentType): The document update.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.ElasticsearchPort.delete abstractmethod

delete(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Delete a document from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
def delete(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Delete a document from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.ElasticsearchPort.bulk abstractmethod

bulk(
    actions: list[dict[str, Any]], **kwargs: Any
) -> ElasticsearchResponseType

Perform bulk operations in Elasticsearch.

Parameters:

Name Type Description Default
actions list[dict[str, Any]]

List of bulk actions to perform.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
def bulk(
    self,
    actions: list[dict[str, Any]],
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Perform bulk operations in Elasticsearch.

    Args:
        actions (list[dict[str, Any]]): List of bulk actions to perform.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.ElasticsearchPort.create_index abstractmethod

create_index(
    index: ElasticsearchIndexType,
    body: dict[str, Any] | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType

Create an index in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
body dict[str, Any] | None

Optional index settings and mappings.

None
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
def create_index(
    self,
    index: ElasticsearchIndexType,
    body: dict[str, Any] | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Create an index in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        body (dict[str, Any] | None): Optional index settings and mappings.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.ElasticsearchPort.delete_index abstractmethod

delete_index(
    index: ElasticsearchIndexType, **kwargs: Any
) -> ElasticsearchResponseType

Delete an index from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
def delete_index(
    self,
    index: ElasticsearchIndexType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Delete an index from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.ElasticsearchPort.exists abstractmethod

exists(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Check if a document exists in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

True if the document exists, False otherwise.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
def exists(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Check if a document exists in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: True if the document exists, False otherwise.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.ElasticsearchPort.index_exists abstractmethod

index_exists(
    index: ElasticsearchIndexType, **kwargs: Any
) -> ElasticsearchResponseType

Check if an index exists in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

True if the index exists, False otherwise.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
def index_exists(
    self,
    index: ElasticsearchIndexType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Check if an index exists in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: True if the index exists, False otherwise.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort

Async interface for Elasticsearch operations providing a standardized access pattern.

This interface defines the contract for async Elasticsearch adapters, ensuring consistent implementation of Elasticsearch operations across different adapters. It covers all essential Elasticsearch functionality including document operations, search, and index management.

Source code in archipy/adapters/elasticsearch/ports.py
class AsyncElasticsearchPort:
    """Async interface for Elasticsearch operations providing a standardized access pattern.

    This interface defines the contract for async Elasticsearch adapters, ensuring consistent
    implementation of Elasticsearch operations across different adapters. It covers all
    essential Elasticsearch functionality including document operations, search, and
    index management.
    """

    @abstractmethod
    async def ping(self) -> ElasticsearchResponseType:
        """Tests the connection to the Elasticsearch server.

        Returns:
            ElasticsearchResponseType: The response from the server.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def index(
        self,
        index: ElasticsearchIndexType,
        document: ElasticsearchDocumentType,
        doc_id: ElasticsearchIdType | None = None,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Index a document in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            document (ElasticsearchDocumentType): The document to index.
            doc_id (ElasticsearchIdType | None): Optional document ID. If not provided, Elasticsearch will generate one.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def get(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Get a document from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The document if found.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def search(
        self,
        index: ElasticsearchIndexType,
        query: ElasticsearchQueryType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Search for documents in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            query (ElasticsearchQueryType): The search query.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The search results.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def update(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        doc: ElasticsearchDocumentType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Update a document in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            doc (ElasticsearchDocumentType): The document update.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def delete(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Delete a document from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def bulk(
        self,
        actions: list[dict[str, Any]],
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Perform bulk operations in Elasticsearch.

        Args:
            actions (list[dict[str, Any]]): List of bulk actions to perform.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def create_index(
        self,
        index: ElasticsearchIndexType,
        body: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Create an index in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            body (dict[str, Any] | None): Optional index settings and mappings.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def delete_index(
        self,
        index: ElasticsearchIndexType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Delete an index from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def exists(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Check if a document exists in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: True if the document exists, False otherwise.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

    @abstractmethod
    async def index_exists(
        self,
        index: ElasticsearchIndexType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Check if an index exists in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: True if the index exists, False otherwise.

        Raises:
            NotImplementedError: If not implemented by the subclass.
        """
        raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort.ping abstractmethod async

ping() -> ElasticsearchResponseType

Tests the connection to the Elasticsearch server.

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from the server.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
async def ping(self) -> ElasticsearchResponseType:
    """Tests the connection to the Elasticsearch server.

    Returns:
        ElasticsearchResponseType: The response from the server.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort.index abstractmethod async

index(
    index: ElasticsearchIndexType,
    document: ElasticsearchDocumentType,
    doc_id: ElasticsearchIdType | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType

Index a document in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
document ElasticsearchDocumentType

The document to index.

required
doc_id ElasticsearchIdType | None

Optional document ID. If not provided, Elasticsearch will generate one.

None
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
async def index(
    self,
    index: ElasticsearchIndexType,
    document: ElasticsearchDocumentType,
    doc_id: ElasticsearchIdType | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Index a document in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        document (ElasticsearchDocumentType): The document to index.
        doc_id (ElasticsearchIdType | None): Optional document ID. If not provided, Elasticsearch will generate one.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort.get abstractmethod async

get(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Get a document from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The document if found.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
async def get(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Get a document from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The document if found.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort.search abstractmethod async

search(
    index: ElasticsearchIndexType,
    query: ElasticsearchQueryType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Search for documents in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
query ElasticsearchQueryType

The search query.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The search results.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
async def search(
    self,
    index: ElasticsearchIndexType,
    query: ElasticsearchQueryType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Search for documents in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        query (ElasticsearchQueryType): The search query.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The search results.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort.update abstractmethod async

update(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    doc: ElasticsearchDocumentType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Update a document in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
doc ElasticsearchDocumentType

The document update.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
async def update(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    doc: ElasticsearchDocumentType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Update a document in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        doc (ElasticsearchDocumentType): The document update.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort.delete abstractmethod async

delete(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Delete a document from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
async def delete(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Delete a document from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort.bulk abstractmethod async

bulk(
    actions: list[dict[str, Any]], **kwargs: Any
) -> ElasticsearchResponseType

Perform bulk operations in Elasticsearch.

Parameters:

Name Type Description Default
actions list[dict[str, Any]]

List of bulk actions to perform.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
async def bulk(
    self,
    actions: list[dict[str, Any]],
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Perform bulk operations in Elasticsearch.

    Args:
        actions (list[dict[str, Any]]): List of bulk actions to perform.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort.create_index abstractmethod async

create_index(
    index: ElasticsearchIndexType,
    body: dict[str, Any] | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType

Create an index in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
body dict[str, Any] | None

Optional index settings and mappings.

None
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
async def create_index(
    self,
    index: ElasticsearchIndexType,
    body: dict[str, Any] | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Create an index in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        body (dict[str, Any] | None): Optional index settings and mappings.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort.delete_index abstractmethod async

delete_index(
    index: ElasticsearchIndexType, **kwargs: Any
) -> ElasticsearchResponseType

Delete an index from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
async def delete_index(
    self,
    index: ElasticsearchIndexType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Delete an index from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort.exists abstractmethod async

exists(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Check if a document exists in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

True if the document exists, False otherwise.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
async def exists(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Check if a document exists in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: True if the document exists, False otherwise.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

archipy.adapters.elasticsearch.ports.AsyncElasticsearchPort.index_exists abstractmethod async

index_exists(
    index: ElasticsearchIndexType, **kwargs: Any
) -> ElasticsearchResponseType

Check if an index exists in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
**kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

True if the index exists, False otherwise.

Raises:

Type Description
NotImplementedError

If not implemented by the subclass.

Source code in archipy/adapters/elasticsearch/ports.py
@abstractmethod
async def index_exists(
    self,
    index: ElasticsearchIndexType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Check if an index exists in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        **kwargs (Any): Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: True if the index exists, False otherwise.

    Raises:
        NotImplementedError: If not implemented by the subclass.
    """
    raise NotImplementedError

options: show_root_toc_entry: false heading_level: 3

Adapters

Concrete Elasticsearch adapter implementing index management, document CRUD, and search operations.

archipy.adapters.elasticsearch.adapters.logger module-attribute

logger = getLogger(__name__)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter

Bases: ElasticsearchPort

Concrete implementation of the ElasticsearchPort interface using elasticsearch-py library.

This implementation provides a standardized way to interact with Elasticsearch, abstracting the underlying client implementation details.

Source code in archipy/adapters/elasticsearch/adapters.py
class ElasticsearchAdapter(ElasticsearchPort):
    """Concrete implementation of the ElasticsearchPort interface using elasticsearch-py library.

    This implementation provides a standardized way to interact with Elasticsearch,
    abstracting the underlying client implementation details.
    """

    def __init__(self, elasticsearch_config: ElasticsearchConfig | None = None) -> None:
        """Initialize the ElasticsearchAdapter with configuration settings.

        Args:
            elasticsearch_config (ElasticsearchConfig, optional): Configuration settings for Elasticsearch.
                If None, retrieves from global config. Defaults to None.
        """
        configs: ElasticsearchConfig = (
            BaseConfig.global_config().ELASTIC if elasticsearch_config is None else elasticsearch_config
        )
        self.client = self._get_client(configs)

    @staticmethod
    def _get_client(configs: ElasticsearchConfig) -> Elasticsearch:
        """Create an Elasticsearch client with the specified configuration.

        Args:
            configs (ElasticsearchConfig): Configuration settings for Elasticsearch.

        Returns:
            Elasticsearch: Configured Elasticsearch client instance.
        """
        api_key: tuple[str, str] | None = None
        basic_auth: tuple[str, str] | None = None
        if configs.API_KEY and configs.API_SECRET:
            api_key = (configs.API_KEY, configs.API_SECRET.get_secret_value())
        elif configs.HTTP_USER_NAME and configs.HTTP_PASSWORD:
            basic_auth = (configs.HTTP_USER_NAME, configs.HTTP_PASSWORD.get_secret_value())

        # Build kwargs, only including SSL parameters if they have values
        kwargs: dict[str, Any] = {
            "hosts": configs.HOSTS,
            "api_key": api_key,
            "basic_auth": basic_auth,
            "request_timeout": configs.REQUEST_TIMEOUT,
            "retry_on_status": configs.RETRY_ON_STATUS,
            "retry_on_timeout": configs.RETRY_ON_TIMEOUT,
            "max_retries": configs.MAX_RETRIES,
            "http_compress": configs.HTTP_COMPRESS,
            "connections_per_node": configs.CONNECTIONS_PER_NODE,
            "verify_certs": configs.VERIFY_CERTS,
            "sniff_on_start": configs.SNIFF_ON_START,
            "sniff_before_requests": configs.SNIFF_BEFORE_REQUESTS,
            "sniff_on_node_failure": configs.SNIFF_ON_NODE_FAILURE,
            "max_dead_node_backoff": configs.MAX_DEAD_NODE_BACKOFF,
        }

        # Only add SSL parameters if they have values (to avoid passing None)
        if configs.CA_CERTS:
            kwargs["ca_certs"] = configs.CA_CERTS
        if configs.CLIENT_KEY:
            kwargs["client_key"] = configs.CLIENT_KEY
        if configs.CLIENT_CERT:
            kwargs["client_cert"] = configs.CLIENT_CERT
        if configs.SSL_ASSERT_FINGERPRINT:
            kwargs["ssl_assert_fingerprint"] = configs.SSL_ASSERT_FINGERPRINT

        return Elasticsearch(**kwargs)

    @override
    def ping(self) -> ElasticsearchResponseType:
        """Test the connection to the Elasticsearch server.

        Returns:
            ElasticsearchResponseType: True if the connection is successful.
        """
        return self.client.ping()

    @override
    def index(
        self,
        index: ElasticsearchIndexType,
        document: ElasticsearchDocumentType,
        doc_id: ElasticsearchIdType | None = None,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Index a document in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            document (ElasticsearchDocumentType): The document to index.
            doc_id (ElasticsearchIdType | None): Optional document ID.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.
        """
        return self.client.index(index=index, document=document, id=doc_id, **kwargs)

    @override
    def get(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Get a document from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The document if found.
        """
        return self.client.get(index=index, id=doc_id, **kwargs)

    @override
    def search(
        self,
        index: ElasticsearchIndexType,
        query: ElasticsearchQueryType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Search for documents in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            query (ElasticsearchQueryType): The search query.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The search results.
        """
        return self.client.search(index=index, body=query, **kwargs)

    @override
    def update(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        doc: ElasticsearchDocumentType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Update a document in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            doc (ElasticsearchDocumentType): The document update.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.
        """
        return self.client.update(index=index, id=doc_id, doc=doc, **kwargs)

    @override
    def delete(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Delete a document from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.
        """
        return self.client.delete(index=index, id=doc_id, **kwargs)

    @override
    def bulk(
        self,
        actions: list[dict[str, Any]],
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Perform bulk operations in Elasticsearch.

        Args:
            actions (list[dict[str, Any]]): List of bulk actions to perform.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.

        Raises:
            BulkIndexError: If any of the bulk operations fail.
        """
        return self.client.bulk(operations=actions, **kwargs)

    @override
    def create_index(
        self,
        index: ElasticsearchIndexType,
        body: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Create an index in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            body (dict[str, Any] | None): Optional index settings and mappings.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.
        """
        return self.client.indices.create(index=index, body=body, **kwargs)

    @override
    def delete_index(
        self,
        index: ElasticsearchIndexType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Delete an index from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.
        """
        return self.client.indices.delete(index=index, **kwargs)

    @override
    def exists(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Check if a document exists in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: True if the document exists, False otherwise.
        """
        return self.client.exists(index=index, id=doc_id, **kwargs)

    @override
    def index_exists(
        self,
        index: ElasticsearchIndexType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Check if an index exists in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: True if the index exists, False otherwise.
        """
        return self.client.indices.exists(index=index, **kwargs)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.client instance-attribute

client = _get_client(configs)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.ping

ping() -> ElasticsearchResponseType

Test the connection to the Elasticsearch server.

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

True if the connection is successful.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
def ping(self) -> ElasticsearchResponseType:
    """Test the connection to the Elasticsearch server.

    Returns:
        ElasticsearchResponseType: True if the connection is successful.
    """
    return self.client.ping()

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.index

index(
    index: ElasticsearchIndexType,
    document: ElasticsearchDocumentType,
    doc_id: ElasticsearchIdType | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType

Index a document in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
document ElasticsearchDocumentType

The document to index.

required
doc_id ElasticsearchIdType | None

Optional document ID.

None
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
def index(
    self,
    index: ElasticsearchIndexType,
    document: ElasticsearchDocumentType,
    doc_id: ElasticsearchIdType | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Index a document in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        document (ElasticsearchDocumentType): The document to index.
        doc_id (ElasticsearchIdType | None): Optional document ID.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.
    """
    return self.client.index(index=index, document=document, id=doc_id, **kwargs)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.get

get(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Get a document from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The document if found.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
def get(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Get a document from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The document if found.
    """
    return self.client.get(index=index, id=doc_id, **kwargs)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.search

search(
    index: ElasticsearchIndexType,
    query: ElasticsearchQueryType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Search for documents in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
query ElasticsearchQueryType

The search query.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The search results.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
def search(
    self,
    index: ElasticsearchIndexType,
    query: ElasticsearchQueryType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Search for documents in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        query (ElasticsearchQueryType): The search query.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The search results.
    """
    return self.client.search(index=index, body=query, **kwargs)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.update

update(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    doc: ElasticsearchDocumentType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Update a document in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
doc ElasticsearchDocumentType

The document update.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
def update(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    doc: ElasticsearchDocumentType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Update a document in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        doc (ElasticsearchDocumentType): The document update.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.
    """
    return self.client.update(index=index, id=doc_id, doc=doc, **kwargs)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.delete

delete(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Delete a document from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
def delete(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Delete a document from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.
    """
    return self.client.delete(index=index, id=doc_id, **kwargs)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.bulk

bulk(
    actions: list[dict[str, Any]], **kwargs: Any
) -> ElasticsearchResponseType

Perform bulk operations in Elasticsearch.

Parameters:

Name Type Description Default
actions list[dict[str, Any]]

List of bulk actions to perform.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Raises:

Type Description
BulkIndexError

If any of the bulk operations fail.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
def bulk(
    self,
    actions: list[dict[str, Any]],
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Perform bulk operations in Elasticsearch.

    Args:
        actions (list[dict[str, Any]]): List of bulk actions to perform.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.

    Raises:
        BulkIndexError: If any of the bulk operations fail.
    """
    return self.client.bulk(operations=actions, **kwargs)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.create_index

create_index(
    index: ElasticsearchIndexType,
    body: dict[str, Any] | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType

Create an index in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
body dict[str, Any] | None

Optional index settings and mappings.

None
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
def create_index(
    self,
    index: ElasticsearchIndexType,
    body: dict[str, Any] | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Create an index in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        body (dict[str, Any] | None): Optional index settings and mappings.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.
    """
    return self.client.indices.create(index=index, body=body, **kwargs)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.delete_index

delete_index(
    index: ElasticsearchIndexType, **kwargs: Any
) -> ElasticsearchResponseType

Delete an index from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
def delete_index(
    self,
    index: ElasticsearchIndexType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Delete an index from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.
    """
    return self.client.indices.delete(index=index, **kwargs)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.exists

exists(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Check if a document exists in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

True if the document exists, False otherwise.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
def exists(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Check if a document exists in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: True if the document exists, False otherwise.
    """
    return self.client.exists(index=index, id=doc_id, **kwargs)

archipy.adapters.elasticsearch.adapters.ElasticsearchAdapter.index_exists

index_exists(
    index: ElasticsearchIndexType, **kwargs: Any
) -> ElasticsearchResponseType

Check if an index exists in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

True if the index exists, False otherwise.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
def index_exists(
    self,
    index: ElasticsearchIndexType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Check if an index exists in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: True if the index exists, False otherwise.
    """
    return self.client.indices.exists(index=index, **kwargs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter

Bases: AsyncElasticsearchPort

Concrete implementation of the AsyncElasticsearchPort interface using elasticsearch-py library.

This implementation provides a standardized way to interact with Elasticsearch asynchronously, abstracting the underlying client implementation details.

Source code in archipy/adapters/elasticsearch/adapters.py
class AsyncElasticsearchAdapter(AsyncElasticsearchPort):
    """Concrete implementation of the AsyncElasticsearchPort interface using elasticsearch-py library.

    This implementation provides a standardized way to interact with Elasticsearch asynchronously,
    abstracting the underlying client implementation details.
    """

    def __init__(self, elasticsearch_config: ElasticsearchConfig | None = None) -> None:
        """Initialize the AsyncElasticsearchAdapter with configuration settings.

        Args:
            elasticsearch_config (ElasticsearchConfig, optional): Configuration settings for Elasticsearch.
                If None, retrieves from global config. Defaults to None.
        """
        configs: ElasticsearchConfig = (
            BaseConfig.global_config().ELASTIC if elasticsearch_config is None else elasticsearch_config
        )
        self.client = self._get_client(configs)

    @staticmethod
    def _get_client(configs: ElasticsearchConfig) -> AsyncElasticsearch:
        """Create an async Elasticsearch client with the specified configuration.

        Args:
            configs (ElasticsearchConfig): Configuration settings for Elasticsearch.

        Returns:
            AsyncElasticsearch: Configured async Elasticsearch client instance.
        """
        api_key: tuple[str, str] | None = None
        basic_auth: tuple[str, str] | None = None
        if configs.API_KEY and configs.API_SECRET:
            api_key = (configs.API_KEY, configs.API_SECRET.get_secret_value())
        elif configs.HTTP_USER_NAME and configs.HTTP_PASSWORD:
            basic_auth = (configs.HTTP_USER_NAME, configs.HTTP_PASSWORD.get_secret_value())

        # Build kwargs, only including SSL parameters if they have values
        kwargs: dict[str, Any] = {
            "hosts": configs.HOSTS,
            "api_key": api_key,
            "basic_auth": basic_auth,
            "request_timeout": configs.REQUEST_TIMEOUT,
            "retry_on_status": configs.RETRY_ON_STATUS,
            "retry_on_timeout": configs.RETRY_ON_TIMEOUT,
            "max_retries": configs.MAX_RETRIES,
            "http_compress": configs.HTTP_COMPRESS,
            "connections_per_node": configs.CONNECTIONS_PER_NODE,
            "verify_certs": configs.VERIFY_CERTS,
            "sniff_on_start": configs.SNIFF_ON_START,
            "sniff_before_requests": configs.SNIFF_BEFORE_REQUESTS,
            "sniff_on_node_failure": configs.SNIFF_ON_NODE_FAILURE,
            "max_dead_node_backoff": configs.MAX_DEAD_NODE_BACKOFF,
        }

        # Only add SSL parameters if they have values (to avoid passing None)
        if configs.CA_CERTS:
            kwargs["ca_certs"] = configs.CA_CERTS
        if configs.CLIENT_KEY:
            kwargs["client_key"] = configs.CLIENT_KEY
        if configs.CLIENT_CERT:
            kwargs["client_cert"] = configs.CLIENT_CERT
        if configs.SSL_ASSERT_FINGERPRINT:
            kwargs["ssl_assert_fingerprint"] = configs.SSL_ASSERT_FINGERPRINT

        return AsyncElasticsearch(**kwargs)

    @override
    async def ping(self) -> ElasticsearchResponseType:
        """Test the connection to the Elasticsearch server.

        Returns:
            ElasticsearchResponseType: True if the connection is successful.
        """
        return await self.client.ping()

    @override
    async def index(
        self,
        index: ElasticsearchIndexType,
        document: ElasticsearchDocumentType,
        doc_id: ElasticsearchIdType | None = None,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Index a document in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            document (ElasticsearchDocumentType): The document to index.
            doc_id (ElasticsearchIdType | None): Optional document ID.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.
        """
        return await self.client.index(index=index, document=document, id=doc_id, **kwargs)

    @override
    async def get(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Get a document from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The document if found.
        """
        return await self.client.get(index=index, id=doc_id, **kwargs)

    @override
    async def search(
        self,
        index: ElasticsearchIndexType,
        query: ElasticsearchQueryType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Search for documents in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            query (ElasticsearchQueryType): The search query.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The search results.
        """
        return await self.client.search(index=index, body=query, **kwargs)

    @override
    async def update(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        doc: ElasticsearchDocumentType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Update a document in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            doc (ElasticsearchDocumentType): The document update.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.
        """
        return await self.client.update(index=index, id=doc_id, doc=doc, **kwargs)

    @override
    async def delete(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Delete a document from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.
        """
        return await self.client.delete(index=index, id=doc_id, **kwargs)

    @override
    async def bulk(
        self,
        actions: list[dict[str, Any]],
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Perform bulk operations in Elasticsearch.

        Args:
            actions (list[dict[str, Any]]): List of bulk actions to perform.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.
        """
        return await self.client.bulk(operations=actions, **kwargs)

    @override
    async def create_index(
        self,
        index: ElasticsearchIndexType,
        body: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Create an index in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            body (dict[str, Any] | None): Optional index settings and mappings.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.
        """
        return await self.client.indices.create(index=index, body=body, **kwargs)

    @override
    async def delete_index(
        self,
        index: ElasticsearchIndexType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Delete an index from Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: The response from Elasticsearch.
        """
        return await self.client.indices.delete(index=index, **kwargs)

    @override
    async def exists(
        self,
        index: ElasticsearchIndexType,
        doc_id: ElasticsearchIdType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Check if a document exists in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            doc_id (ElasticsearchIdType): The document ID.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: True if the document exists, False otherwise.
        """
        return await self.client.exists(index=index, id=doc_id, **kwargs)

    @override
    async def index_exists(
        self,
        index: ElasticsearchIndexType,
        **kwargs: Any,
    ) -> ElasticsearchResponseType:
        """Check if an index exists in Elasticsearch.

        Args:
            index (ElasticsearchIndexType): The index name.
            kwargs: Additional keyword arguments passed to the Elasticsearch client.

        Returns:
            ElasticsearchResponseType: True if the index exists, False otherwise.
        """
        return await self.client.indices.exists(index=index, **kwargs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.client instance-attribute

client = _get_client(configs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.ping async

ping() -> ElasticsearchResponseType

Test the connection to the Elasticsearch server.

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

True if the connection is successful.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
async def ping(self) -> ElasticsearchResponseType:
    """Test the connection to the Elasticsearch server.

    Returns:
        ElasticsearchResponseType: True if the connection is successful.
    """
    return await self.client.ping()

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.index async

index(
    index: ElasticsearchIndexType,
    document: ElasticsearchDocumentType,
    doc_id: ElasticsearchIdType | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType

Index a document in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
document ElasticsearchDocumentType

The document to index.

required
doc_id ElasticsearchIdType | None

Optional document ID.

None
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
async def index(
    self,
    index: ElasticsearchIndexType,
    document: ElasticsearchDocumentType,
    doc_id: ElasticsearchIdType | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Index a document in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        document (ElasticsearchDocumentType): The document to index.
        doc_id (ElasticsearchIdType | None): Optional document ID.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.
    """
    return await self.client.index(index=index, document=document, id=doc_id, **kwargs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.get async

get(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Get a document from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The document if found.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
async def get(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Get a document from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The document if found.
    """
    return await self.client.get(index=index, id=doc_id, **kwargs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.search async

search(
    index: ElasticsearchIndexType,
    query: ElasticsearchQueryType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Search for documents in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
query ElasticsearchQueryType

The search query.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The search results.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
async def search(
    self,
    index: ElasticsearchIndexType,
    query: ElasticsearchQueryType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Search for documents in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        query (ElasticsearchQueryType): The search query.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The search results.
    """
    return await self.client.search(index=index, body=query, **kwargs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.update async

update(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    doc: ElasticsearchDocumentType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Update a document in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
doc ElasticsearchDocumentType

The document update.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
async def update(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    doc: ElasticsearchDocumentType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Update a document in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        doc (ElasticsearchDocumentType): The document update.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.
    """
    return await self.client.update(index=index, id=doc_id, doc=doc, **kwargs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.delete async

delete(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Delete a document from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
async def delete(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Delete a document from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.
    """
    return await self.client.delete(index=index, id=doc_id, **kwargs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.bulk async

bulk(
    actions: list[dict[str, Any]], **kwargs: Any
) -> ElasticsearchResponseType

Perform bulk operations in Elasticsearch.

Parameters:

Name Type Description Default
actions list[dict[str, Any]]

List of bulk actions to perform.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
async def bulk(
    self,
    actions: list[dict[str, Any]],
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Perform bulk operations in Elasticsearch.

    Args:
        actions (list[dict[str, Any]]): List of bulk actions to perform.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.
    """
    return await self.client.bulk(operations=actions, **kwargs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.create_index async

create_index(
    index: ElasticsearchIndexType,
    body: dict[str, Any] | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType

Create an index in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
body dict[str, Any] | None

Optional index settings and mappings.

None
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
async def create_index(
    self,
    index: ElasticsearchIndexType,
    body: dict[str, Any] | None = None,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Create an index in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        body (dict[str, Any] | None): Optional index settings and mappings.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.
    """
    return await self.client.indices.create(index=index, body=body, **kwargs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.delete_index async

delete_index(
    index: ElasticsearchIndexType, **kwargs: Any
) -> ElasticsearchResponseType

Delete an index from Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

The response from Elasticsearch.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
async def delete_index(
    self,
    index: ElasticsearchIndexType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Delete an index from Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: The response from Elasticsearch.
    """
    return await self.client.indices.delete(index=index, **kwargs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.exists async

exists(
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType

Check if a document exists in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
doc_id ElasticsearchIdType

The document ID.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

True if the document exists, False otherwise.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
async def exists(
    self,
    index: ElasticsearchIndexType,
    doc_id: ElasticsearchIdType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Check if a document exists in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        doc_id (ElasticsearchIdType): The document ID.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: True if the document exists, False otherwise.
    """
    return await self.client.exists(index=index, id=doc_id, **kwargs)

archipy.adapters.elasticsearch.adapters.AsyncElasticsearchAdapter.index_exists async

index_exists(
    index: ElasticsearchIndexType, **kwargs: Any
) -> ElasticsearchResponseType

Check if an index exists in Elasticsearch.

Parameters:

Name Type Description Default
index ElasticsearchIndexType

The index name.

required
kwargs Any

Additional keyword arguments passed to the Elasticsearch client.

{}

Returns:

Name Type Description
ElasticsearchResponseType ElasticsearchResponseType

True if the index exists, False otherwise.

Source code in archipy/adapters/elasticsearch/adapters.py
@override
async def index_exists(
    self,
    index: ElasticsearchIndexType,
    **kwargs: Any,
) -> ElasticsearchResponseType:
    """Check if an index exists in Elasticsearch.

    Args:
        index (ElasticsearchIndexType): The index name.
        kwargs: Additional keyword arguments passed to the Elasticsearch client.

    Returns:
        ElasticsearchResponseType: True if the index exists, False otherwise.
    """
    return await self.client.indices.exists(index=index, **kwargs)

options: show_root_toc_entry: false heading_level: 3