Skip to content

Adapters

The adapters module provides standardized interfaces to external systems and services. It follows the ports and adapters pattern (also known as hexagonal architecture) to decouple application logic from external dependencies.

Key Features

  • Consistent interfaces for all external services
  • Built-in mock implementations for testing
  • Port definitions for dependency inversion
  • Ready-to-use implementations for common services

Available Adapters

Email

Email sending functionality with standardized interface.

from archipy.adapters.email import EmailAdapter, EmailPort

# Configure email adapter
email_adapter = EmailAdapter(host="smtp.example.com", port=587, username="user", password="pass")

# Send an email
email_adapter.send_email(
    subject="Test Email",
    body="This is a test email",
    recipients=["recipient@example.com"],
)

archipy.adapters.email.adapters

archipy.adapters.email.adapters.AttachmentHandler

Enhanced attachment handler with better type safety and validation

Source code in archipy/adapters/email/adapters.py
class AttachmentHandler:
    """Enhanced attachment handler with better type safety and validation"""

    @staticmethod
    def create_attachment(
        source: str | bytes | BinaryIO | HttpUrl,
        filename: str,
        attachment_type: EmailAttachmentType,
        content_type: str | None = None,
        content_disposition: EmailAttachmentDispositionType = EmailAttachmentDispositionType.ATTACHMENT,
        content_id: str | None = None,
        max_size: int | None = None,
    ) -> EmailAttachmentDTO:
        """Create an attachment with validation"""
        if max_size is None:
            max_size = BaseConfig.global_config().EMAIL.ATTACHMENT_MAX_SIZE
        try:
            processed_content = AttachmentHandler._process_source(source, attachment_type)

            return EmailAttachmentDTO(
                content=processed_content,
                filename=filename,
                content_type=content_type,
                content_disposition=content_disposition,
                content_id=content_id,
                attachment_type=attachment_type,
                max_size=max_size,
            )
        except Exception as exception:
            raise InvalidArgumentError(f"Failed to create attachment: {exception!s}") from exception

    @staticmethod
    def _process_source(source: str | bytes | BinaryIO | HttpUrl, attachment_type: EmailAttachmentType) -> bytes:
        """Process different types of attachment sources"""
        if attachment_type == EmailAttachmentType.FILE:
            with open(source, "rb") as f:
                return f.read()
        elif attachment_type == EmailAttachmentType.BASE64:
            return base64.b64decode(source)
        elif attachment_type == EmailAttachmentType.URL:
            response = requests.get(source)
            response.raise_for_status()
            return response.content
        elif attachment_type == EmailAttachmentType.BINARY:
            if isinstance(source, bytes):
                return source
            if hasattr(source, "read"):
                return source.read()
            raise ValueError(f"Invalid binary source type: {type(source)}")
        raise ValueError(f"Unsupported attachment type: {attachment_type}")

    @staticmethod
    def process_attachment(msg: MIMEMultipart, attachment: EmailAttachmentDTO) -> None:
        """Process and attach the attachment to the email message"""
        content = AttachmentHandler._get_content(attachment)
        part = AttachmentHandler._create_mime_part(content, attachment)

        # Add headers
        part.add_header("Content-Disposition", attachment.content_disposition.value, filename=attachment.filename)

        if attachment.content_id:
            part.add_header("Content-ID", attachment.content_id)

        msg.attach(part)

    @staticmethod
    def _get_content(attachment: EmailAttachmentDTO) -> bytes:
        """Get content as bytes from attachment"""
        if isinstance(attachment.content, (str, bytes)):
            return attachment.content if isinstance(attachment.content, bytes) else attachment.content.encode()
        return attachment.content.read()

    @staticmethod
    def _create_mime_part(
        content: bytes,
        attachment: EmailAttachmentDTO,
    ) -> MIMEText | MIMEImage | MIMEAudio | MIMEBase:
        """Create appropriate MIME part based on content type"""
        main_type, sub_type = attachment.content_type.split("/", 1)

        if main_type == "text":
            return MIMEText(content.decode(), sub_type)
        if main_type == "image":
            return MIMEImage(content, _subtype=sub_type)
        if main_type == "audio":
            return MIMEAudio(content, _subtype=sub_type)
        part = MIMEBase(main_type, sub_type)
        part.set_payload(content)
        encoders.encode_base64(part)
        return part

archipy.adapters.email.adapters.AttachmentHandler.create_attachment(source, filename, attachment_type, content_type=None, content_disposition=EmailAttachmentDispositionType.ATTACHMENT, content_id=None, max_size=None) staticmethod

Create an attachment with validation

Source code in archipy/adapters/email/adapters.py
@staticmethod
def create_attachment(
    source: str | bytes | BinaryIO | HttpUrl,
    filename: str,
    attachment_type: EmailAttachmentType,
    content_type: str | None = None,
    content_disposition: EmailAttachmentDispositionType = EmailAttachmentDispositionType.ATTACHMENT,
    content_id: str | None = None,
    max_size: int | None = None,
) -> EmailAttachmentDTO:
    """Create an attachment with validation"""
    if max_size is None:
        max_size = BaseConfig.global_config().EMAIL.ATTACHMENT_MAX_SIZE
    try:
        processed_content = AttachmentHandler._process_source(source, attachment_type)

        return EmailAttachmentDTO(
            content=processed_content,
            filename=filename,
            content_type=content_type,
            content_disposition=content_disposition,
            content_id=content_id,
            attachment_type=attachment_type,
            max_size=max_size,
        )
    except Exception as exception:
        raise InvalidArgumentError(f"Failed to create attachment: {exception!s}") from exception

archipy.adapters.email.adapters.AttachmentHandler.process_attachment(msg, attachment) staticmethod

Process and attach the attachment to the email message

Source code in archipy/adapters/email/adapters.py
@staticmethod
def process_attachment(msg: MIMEMultipart, attachment: EmailAttachmentDTO) -> None:
    """Process and attach the attachment to the email message"""
    content = AttachmentHandler._get_content(attachment)
    part = AttachmentHandler._create_mime_part(content, attachment)

    # Add headers
    part.add_header("Content-Disposition", attachment.content_disposition.value, filename=attachment.filename)

    if attachment.content_id:
        part.add_header("Content-ID", attachment.content_id)

    msg.attach(part)

archipy.adapters.email.adapters.EmailAdapter

Bases: EmailPort

Source code in archipy/adapters/email/adapters.py
class EmailAdapter(EmailPort):
    def __init__(self, config: EmailConfig = None):
        self.config = config or BaseConfig.global_config().EMAIL
        self.connection_pool = EmailConnectionPool(self.config)

    @override
    def send_email(
        self,
        to_email: EmailStr | list[EmailStr],
        subject: str,
        body: str,
        cc: EmailStr | list[EmailStr] | None = None,
        bcc: EmailStr | list[EmailStr] | None = None,
        attachments: list[str | EmailAttachmentDTO] | None = None,
        html: bool = False,
        template: str | None = None,
        template_vars: dict | None = None,
    ) -> None:
        """Send email with advanced features and connection pooling"""
        connection = None
        try:
            connection = self.connection_pool.get_connection()
            msg = self._create_message(
                to_email=to_email,
                subject=subject,
                body=body,
                cc=cc,
                bcc=bcc,
                attachments=attachments,
                html=html,
                template=template,
                template_vars=template_vars,
            )

            recipients = self._get_all_recipients(to_email, cc, bcc)

            for attempt in range(self.config.MAX_RETRIES):
                try:
                    if connection.smtp_connection:
                        connection.smtp_connection.send_message(msg, to_addrs=recipients)
                        logging.debug(f"Email sent successfully to {to_email}")
                        return
                    connection.connect()
                except Exception as e:
                    if attempt == self.config.MAX_RETRIES - 1:
                        BaseUtils.capture_exception(e)
                    connection.connect()  # Retry with fresh connection

        except Exception as e:
            BaseUtils.capture_exception(e)
        finally:
            if connection:
                self.connection_pool.return_connection(connection)

    def _create_message(
        self,
        to_email: EmailStr | list[EmailStr],
        subject: str,
        body: str,
        cc: EmailStr | list[EmailStr] | None = None,
        bcc: EmailStr | list[EmailStr] | None = None,
        attachments: list[str | EmailAttachmentDTO] | None = None,
        html: bool = False,
        template: str | None = None,
        template_vars: dict | None = None,
    ) -> MIMEMultipart:
        msg = MIMEMultipart()
        msg["From"] = self.config.EMAIL_USERNAME
        msg["To"] = to_email if isinstance(to_email, str) else ", ".join(to_email)
        msg["Subject"] = subject

        if cc:
            msg["Cc"] = cc if isinstance(cc, str) else ", ".join(cc)
        if bcc:
            msg["Bcc"] = bcc if isinstance(bcc, str) else ", ".join(bcc)

        if template:
            body = Template(template).render(**(template_vars or {}))

        msg.attach(MIMEText(body, "html" if html else "plain"))

        if attachments:
            for attachment in attachments:
                if isinstance(attachment, str):
                    # Treat as file path
                    attachment_obj = AttachmentHandler.create_attachment(
                        source=attachment,
                        filename=os.path.basename(attachment),
                        attachment_type=EmailAttachmentType.FILE,
                    )
                else:
                    attachment_obj = attachment
                AttachmentHandler.process_attachment(msg, attachment_obj)

        return msg

    @staticmethod
    def _get_all_recipients(
        to_email: EmailStr | list[EmailStr],
        cc: EmailStr | list[EmailStr] | None,
        bcc: EmailStr | list[EmailStr] | None,
    ) -> list[str]:
        """Get list of all recipients"""
        recipients = []

        # Add primary recipients
        if isinstance(to_email, str):
            recipients.append(to_email)
        else:
            recipients.extend(to_email)

        # Add CC recipients
        if cc:
            if isinstance(cc, str):
                recipients.append(cc)
            else:
                recipients.extend(cc)

        # Add BCC recipients
        if bcc:
            if isinstance(bcc, str):
                recipients.append(bcc)
            else:
                recipients.extend(bcc)

        return recipients

archipy.adapters.email.adapters.EmailAdapter.send_email(to_email, subject, body, cc=None, bcc=None, attachments=None, html=False, template=None, template_vars=None)

Send email with advanced features and connection pooling

Source code in archipy/adapters/email/adapters.py
@override
def send_email(
    self,
    to_email: EmailStr | list[EmailStr],
    subject: str,
    body: str,
    cc: EmailStr | list[EmailStr] | None = None,
    bcc: EmailStr | list[EmailStr] | None = None,
    attachments: list[str | EmailAttachmentDTO] | None = None,
    html: bool = False,
    template: str | None = None,
    template_vars: dict | None = None,
) -> None:
    """Send email with advanced features and connection pooling"""
    connection = None
    try:
        connection = self.connection_pool.get_connection()
        msg = self._create_message(
            to_email=to_email,
            subject=subject,
            body=body,
            cc=cc,
            bcc=bcc,
            attachments=attachments,
            html=html,
            template=template,
            template_vars=template_vars,
        )

        recipients = self._get_all_recipients(to_email, cc, bcc)

        for attempt in range(self.config.MAX_RETRIES):
            try:
                if connection.smtp_connection:
                    connection.smtp_connection.send_message(msg, to_addrs=recipients)
                    logging.debug(f"Email sent successfully to {to_email}")
                    return
                connection.connect()
            except Exception as e:
                if attempt == self.config.MAX_RETRIES - 1:
                    BaseUtils.capture_exception(e)
                connection.connect()  # Retry with fresh connection

    except Exception as e:
        BaseUtils.capture_exception(e)
    finally:
        if connection:
            self.connection_pool.return_connection(connection)

archipy.adapters.email.ports

archipy.adapters.email.ports.EmailPort

Interface for email sending operations.

This interface defines the contract for email adapters, ensuring a consistent approach to sending emails across different implementations. It provides a comprehensive set of features including support for:

  • Multiple recipients (To, CC, BCC)
  • HTML and plain text content
  • File and in-memory attachments
  • Template-based email rendering

Implementing classes should handle the details of connecting to an email service, managing connections, and ensuring reliable delivery.

Examples:

>>> from archipy.adapters.email.email_port import EmailPort
>>>
>>> class CustomEmailAdapter(EmailPort):
...     def __init__(self, config):
...         self.config = config
...
...     def send_email(
...         self,
...         to_email,
...         subject,
...         body,
...         cc=None,
...         bcc=None,
...         attachments=None,
...         html=False,
...         template=None,
...         template_vars=None
...     ):
...         # Implementation details...
...         pass
Source code in archipy/adapters/email/ports.py
class EmailPort:
    """Interface for email sending operations.

    This interface defines the contract for email adapters, ensuring
    a consistent approach to sending emails across different implementations.
    It provides a comprehensive set of features including support for:

    - Multiple recipients (To, CC, BCC)
    - HTML and plain text content
    - File and in-memory attachments
    - Template-based email rendering

    Implementing classes should handle the details of connecting to an
    email service, managing connections, and ensuring reliable delivery.

    Examples:
        >>> from archipy.adapters.email.email_port import EmailPort
        >>>
        >>> class CustomEmailAdapter(EmailPort):
        ...     def __init__(self, config):
        ...         self.config = config
        ...
        ...     def send_email(
        ...         self,
        ...         to_email,
        ...         subject,
        ...         body,
        ...         cc=None,
        ...         bcc=None,
        ...         attachments=None,
        ...         html=False,
        ...         template=None,
        ...         template_vars=None
        ...     ):
        ...         # Implementation details...
        ...         pass
    """

    @abstractmethod
    def send_email(
        self,
        to_email: EmailStr | list[EmailStr],
        subject: str,
        body: str,
        cc: EmailStr | list[EmailStr] | None = None,
        bcc: EmailStr | list[EmailStr] | None = None,
        attachments: list[str | EmailAttachmentDTO] | None = None,
        html: bool = False,
        template: str | None = None,
        template_vars: dict | None = None,
    ) -> None:
        """Send an email with various options and features.

        This method handles the composition and delivery of an email with
        support for multiple recipients, HTML content, templates, and attachments.

        Args:
            to_email: Primary recipient(s) of the email
            subject: Email subject line
            body: Email body content (either plain text or HTML)
            cc: Carbon copy recipient(s)
            bcc: Blind carbon copy recipient(s)
            attachments: List of file paths or EmailAttachmentDTO objects
            html: If True, treats body as HTML content, otherwise plain text
            template: A template string to render using template_vars
            template_vars: Variables to use when rendering the template

        Returns:
            None

        Examples:
            >>> # Simple text email
            >>> adapter.send_email(
            ...     to_email="user@example.com",
            ...     subject="Hello",
            ...     body="This is a test email"
            ... )
            >>>
            >>> # HTML email with attachment
            >>> adapter.send_email(
            ...     to_email=["user1@example.com", "user2@example.com"],
            ...     subject="Report",
            ...     body="<h1>Monthly Report</h1><p>Please see attached</p>",
            ...     html=True,
            ...     attachments=["path/to/report.pdf"]
            ... )
            >>>
            >>> # Template-based email
            >>> template = "Hello {{ name }}, your account expires on {{ date }}"
            >>> adapter.send_email(
            ...     to_email="user@example.com",
            ...     subject="Account Expiration",
            ...     body="",  # Body will be rendered from template
            ...     template=template,
            ...     template_vars={"name": "John", "date": "2023-12-31"}
            ... )
        """
        raise NotImplementedError

archipy.adapters.email.ports.EmailPort.send_email(to_email, subject, body, cc=None, bcc=None, attachments=None, html=False, template=None, template_vars=None) abstractmethod

Send an email with various options and features.

This method handles the composition and delivery of an email with support for multiple recipients, HTML content, templates, and attachments.

Parameters:

Name Type Description Default
to_email EmailStr | list[EmailStr]

Primary recipient(s) of the email

required
subject str

Email subject line

required
body str

Email body content (either plain text or HTML)

required
cc EmailStr | list[EmailStr] | None

Carbon copy recipient(s)

None
bcc EmailStr | list[EmailStr] | None

Blind carbon copy recipient(s)

None
attachments list[str | EmailAttachmentDTO] | None

List of file paths or EmailAttachmentDTO objects

None
html bool

If True, treats body as HTML content, otherwise plain text

False
template str | None

A template string to render using template_vars

None
template_vars dict | None

Variables to use when rendering the template

None

Returns:

Type Description
None

None

Examples:

>>> # Simple text email
>>> adapter.send_email(
...     to_email="user@example.com",
...     subject="Hello",
...     body="This is a test email"
... )
>>>
>>> # HTML email with attachment
>>> adapter.send_email(
...     to_email=["user1@example.com", "user2@example.com"],
...     subject="Report",
...     body="<h1>Monthly Report</h1><p>Please see attached</p>",
...     html=True,
...     attachments=["path/to/report.pdf"]
... )
>>>
>>> # Template-based email
>>> template = "Hello {{ name }}, your account expires on {{ date }}"
>>> adapter.send_email(
...     to_email="user@example.com",
...     subject="Account Expiration",
...     body="",  # Body will be rendered from template
...     template=template,
...     template_vars={"name": "John", "date": "2023-12-31"}
... )
Source code in archipy/adapters/email/ports.py
@abstractmethod
def send_email(
    self,
    to_email: EmailStr | list[EmailStr],
    subject: str,
    body: str,
    cc: EmailStr | list[EmailStr] | None = None,
    bcc: EmailStr | list[EmailStr] | None = None,
    attachments: list[str | EmailAttachmentDTO] | None = None,
    html: bool = False,
    template: str | None = None,
    template_vars: dict | None = None,
) -> None:
    """Send an email with various options and features.

    This method handles the composition and delivery of an email with
    support for multiple recipients, HTML content, templates, and attachments.

    Args:
        to_email: Primary recipient(s) of the email
        subject: Email subject line
        body: Email body content (either plain text or HTML)
        cc: Carbon copy recipient(s)
        bcc: Blind carbon copy recipient(s)
        attachments: List of file paths or EmailAttachmentDTO objects
        html: If True, treats body as HTML content, otherwise plain text
        template: A template string to render using template_vars
        template_vars: Variables to use when rendering the template

    Returns:
        None

    Examples:
        >>> # Simple text email
        >>> adapter.send_email(
        ...     to_email="user@example.com",
        ...     subject="Hello",
        ...     body="This is a test email"
        ... )
        >>>
        >>> # HTML email with attachment
        >>> adapter.send_email(
        ...     to_email=["user1@example.com", "user2@example.com"],
        ...     subject="Report",
        ...     body="<h1>Monthly Report</h1><p>Please see attached</p>",
        ...     html=True,
        ...     attachments=["path/to/report.pdf"]
        ... )
        >>>
        >>> # Template-based email
        >>> template = "Hello {{ name }}, your account expires on {{ date }}"
        >>> adapter.send_email(
        ...     to_email="user@example.com",
        ...     subject="Account Expiration",
        ...     body="",  # Body will be rendered from template
        ...     template=template,
        ...     template_vars={"name": "John", "date": "2023-12-31"}
        ... )
    """
    raise NotImplementedError

Keycloak

Keycloak integration for authentication and authorization services.

from archipy.adapters.keycloak import KeycloakAdapter, AsyncKeycloakAdapter

# Create a Keycloak adapter (synchronous)
keycloak = KeycloakAdapter()  # Uses global config by default

# Authenticate a user
token = keycloak.get_token("username", "password")

# Validate token
is_valid = keycloak.validate_token(token["access_token"])

# Check user roles
has_admin = keycloak.has_role(token["access_token"], "admin")

# Async usage example
import asyncio

async def auth_example():
    # Create async Keycloak adapter
    async_keycloak = AsyncKeycloakAdapter()

    # Get token asynchronously
    token = await async_keycloak.get_token("username", "password")

    # Get user info
    user_info = await async_keycloak.get_userinfo(token["access_token"])
    return user_info

# Run the async example
user_info = asyncio.run(auth_example())

For detailed examples and usage guidelines, see the Keycloak Adapter Examples.

archipy.adapters.keycloak.adapters

archipy.adapters.keycloak.adapters.KeycloakAdapter

Bases: KeycloakPort

Concrete implementation of the KeycloakPort interface using python-keycloak library.

This implementation includes TTL caching for appropriate operations to improve performance while ensuring cache entries expire after a configured time to prevent stale data.

Source code in archipy/adapters/keycloak/adapters.py
  23
  24
  25
  26
  27
  28
  29
  30
  31
  32
  33
  34
  35
  36
  37
  38
  39
  40
  41
  42
  43
  44
  45
  46
  47
  48
  49
  50
  51
  52
  53
  54
  55
  56
  57
  58
  59
  60
  61
  62
  63
  64
  65
  66
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
class KeycloakAdapter(KeycloakPort):
    """Concrete implementation of the KeycloakPort interface using python-keycloak library.

    This implementation includes TTL caching for appropriate operations to improve performance
    while ensuring cache entries expire after a configured time to prevent stale data.
    """

    def __init__(self, keycloak_configs: KeycloakConfig | None = None) -> None:
        """Initialize KeycloakAdapter with configuration.

        Args:
            keycloak_configs: Optional Keycloak configuration. If None, global config is used.
        """
        self.configs: KeycloakConfig = (
            BaseConfig.global_config().KEYCLOAK if keycloak_configs is None else keycloak_configs
        )

        # Initialize the OpenID client for authentication
        self.openid_adapter = self._get_openid_client(self.configs)

        # Cache for admin client to avoid unnecessary re-authentication
        self._admin_adapter = None
        self._admin_token_expiry = 0

        # Initialize admin client with service account if client_secret is provided
        if self.configs.CLIENT_SECRET_KEY:
            self._initialize_admin_client()

    def clear_all_caches(self) -> None:
        """Clear all cached values."""
        for attr_name in dir(self):
            attr = getattr(self, attr_name)
            if hasattr(attr, "clear_cache"):
                attr.clear_cache()

    @staticmethod
    def _get_openid_client(configs: KeycloakConfig) -> KeycloakOpenID:
        """Create and configure a KeycloakOpenID instance.

        Args:
            configs: Keycloak configuration

        Returns:
            Configured KeycloakOpenID client
        """
        return KeycloakOpenID(
            server_url=configs.SERVER_URL,
            client_id=configs.CLIENT_ID,
            realm_name=configs.REALM_NAME,
            client_secret_key=configs.CLIENT_SECRET_KEY,
            verify=configs.VERIFY_SSL,
            timeout=configs.TIMEOUT,
        )

    def _initialize_admin_client(self) -> None:
        """Initialize or refresh the admin client."""
        try:
            # Get token using client credentials
            token = self.openid_adapter.token(grant_type="client_credentials")

            # Set token expiry time (current time + expires_in - buffer)
            # Using a 30-second buffer to ensure we refresh before expiration
            self._admin_token_expiry = time.time() + token.get("expires_in", 60) - 30

            # Create admin client with the token
            self._admin_adapter = KeycloakAdmin(
                server_url=self.configs.SERVER_URL,
                realm_name=self.configs.REALM_NAME,
                token=token,
                verify=self.configs.VERIFY_SSL,
                timeout=self.configs.TIMEOUT,
            )
            logger.debug("Admin client initialized successfully")
        except KeycloakError as e:
            logger.error(f"Failed to initialize admin client: {e!s}")
            self._admin_adapter = None
            self._admin_token_expiry = 0

    @property
    def admin_adapter(self) -> KeycloakAdmin:
        """Get the admin adapter, refreshing it if necessary.

        Returns:
            KeycloakAdmin instance

        Raises:
            ValueError: If admin client is not available
        """
        if not self.configs.CLIENT_SECRET_KEY:
            raise ValueError("Admin client not available, provide client_secret_key to use admin features")

        # Check if token is about to expire and refresh if needed
        if self._admin_adapter is None or time.time() >= self._admin_token_expiry:
            self._initialize_admin_client()

        if self._admin_adapter is None:
            raise ValueError("Failed to initialize admin client")

        return self._admin_adapter

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour, public key rarely changes
    def get_public_key(self) -> Any:
        """Get the public key used to verify tokens.

        Returns:
            JWK key object used to verify signatures
        """
        try:
            from jwcrypto import jwk

            keys_info = self.openid_adapter.public_key()
            key = f"-----BEGIN PUBLIC KEY-----\n{keys_info}\n-----END PUBLIC KEY-----"
            return jwk.JWK.from_pem(key.encode("utf-8"))
        except Exception as e:
            logger.error(f"Failed to get public key: {e!s}")
            raise ValueError(f"Failed to get public key: {e!s}")

    @override
    def get_token(self, username: str, password: str) -> KeycloakTokenType:
        """Get a user token by username and password using the Resource Owner Password Credentials Grant.

        Warning:
            This method uses the direct password grant flow, which is less secure and not recommended
            for user login in production environments. Instead, prefer the web-based OAuth 2.0
            Authorization Code Flow (use `get_token_from_code`) for secure authentication.
            Use this method only for testing, administrative tasks, or specific service accounts
            where direct credential use is acceptable and properly secured.

        Args:
            username: User's username
            password: User's password

        Returns:
            Token response containing access_token, refresh_token, etc.

        Raises:
            ValueError: If token acquisition fails
        """
        try:
            return self.openid_adapter.token(grant_type="password", username=username, password=password)
        except KeycloakError as e:
            logger.error(f"Failed to get token: {e!s}")
            raise ValueError(f"Failed to get token: {e!s}")

    @override
    def refresh_token(self, refresh_token: str) -> KeycloakTokenType:
        """Refresh an existing token using a refresh token.

        Args:
            refresh_token: Refresh token string

        Returns:
            New token response containing access_token, refresh_token, etc.

        Raises:
            ValueError: If token refresh fails
        """
        try:
            return self.openid_adapter.refresh_token(refresh_token)
        except KeycloakError as e:
            logger.error(f"Failed to refresh token: {e!s}")
            raise ValueError(f"Failed to refresh token: {e!s}")

    @override
    def validate_token(self, token: str) -> bool:
        """Validate if a token is still valid.

        Args:
            token: Access token to validate

        Returns:
            True if token is valid, False otherwise
        """
        # Not caching validation results as tokens are time-sensitive
        try:
            self.openid_adapter.decode_token(
                token,
                key=self.get_public_key(),
            )
            return True
        except Exception as e:
            logger.debug(f"Token validation failed: {e!s}")
            return False

    @override
    def get_userinfo(self, token: str) -> KeycloakUserType:
        """Get user information from a token.

        Args:
            token: Access token

        Returns:
            User information

        Raises:
            ValueError: If getting user info fails
        """
        if not self.validate_token(token):
            logger.error("Invalid token provided for userinfo request")
            raise ValueError("Invalid token provided")
        try:
            return self._get_userinfo_cached(token)
        except KeycloakError as e:
            logger.error(f"Failed to get user info: {e!s}")
            raise ValueError(f"Failed to get user info: {e!s}")

    @ttl_cache_decorator(ttl_seconds=30, maxsize=100)  # Cache for 30 seconds
    def _get_userinfo_cached(self, token):
        return self.openid_adapter.userinfo(token)

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
    def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
        """Get user details by user ID.

        Args:
            user_id: User's ID

        Returns:
            User details or None if not found

        Raises:
            ValueError: If getting user fails
        """
        try:
            return self.admin_adapter.get_user(user_id)
        except KeycloakGetError as e:
            if e.response_code == 404:
                return None
            logger.error(f"Failed to get user by ID: {e!s}")
            raise ValueError(f"Failed to get user by ID: {e!s}")
        except KeycloakError as e:
            logger.error(f"Failed to get user by ID: {e!s}")
            raise ValueError(f"Failed to get user by ID: {e!s}")

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
    def get_user_by_username(self, username: str) -> KeycloakUserType | None:
        """Get user details by username.

        Args:
            username: User's username

        Returns:
            User details or None if not found

        Raises:
            ValueError: If query fails
        """
        try:
            users = self.admin_adapter.get_users({"username": username})
            return users[0] if users else None
        except KeycloakError as e:
            logger.error(f"Failed to get user by username: {e!s}")
            raise ValueError(f"Failed to get user by username: {e!s}")

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
    def get_user_by_email(self, email: str) -> KeycloakUserType | None:
        """Get user details by email.

        Args:
            email: User's email

        Returns:
            User details or None if not found

        Raises:
            ValueError: If query fails
        """
        try:
            users = self.admin_adapter.get_users({"email": email})
            return users[0] if users else None
        except KeycloakError as e:
            logger.error(f"Failed to get user by email: {e!s}")
            raise ValueError(f"Failed to get user by email: {e!s}")

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
    def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
        """Get roles assigned to a user.

        Args:
            user_id: User's ID

        Returns:
            List of roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return self.admin_adapter.get_realm_roles_of_user(user_id)
        except KeycloakError as e:
            logger.error(f"Failed to get user roles: {e!s}")
            raise ValueError(f"Failed to get user roles: {e!s}")

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
    def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
        """Get client-specific roles assigned to a user.

        Args:
            user_id: User's ID
            client_id: Client ID

        Returns:
            List of client-specific roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return self.admin_adapter.get_client_roles_of_user(user_id, client_id)
        except KeycloakError as e:
            logger.error(f"Failed to get client roles: {e!s}")
            raise ValueError(f"Failed to get client roles: {e!s}")

    @override
    def has_role(self, token: str, role_name: str) -> bool:
        """Check if a user has a specific role.

        Args:
            token: Access token
            role_name: Role name to check

        Returns:
            True if user has the role, False otherwise
        """
        # Not caching this result as token validation is time-sensitive
        try:
            user_info = self.get_userinfo(token)

            # Check realm roles
            realm_access = user_info.get("realm_access", {})
            roles = realm_access.get("roles", [])
            if role_name in roles:
                return True

            # Check client roles
            resource_access = user_info.get("resource_access", {})
            for client in resource_access.values():
                client_roles = client.get("roles", [])
                if role_name in client_roles:
                    return True

            return False
        except Exception as e:
            logger.debug(f"Role check failed: {e!s}")
            return False

    @override
    def has_any_of_roles(self, token: str, role_names: set[str]) -> bool:
        """Check if a user has any of the specified roles.

        Args:
            token: Access token
            role_names: Set of role names to check

        Returns:
            True if user has any of the roles, False otherwise
        """
        try:
            user_info = self.get_userinfo(token)

            # Check realm roles
            realm_access = user_info.get("realm_access", {})
            roles = set(realm_access.get("roles", []))
            if role_names.intersection(roles):
                return True

            # Check client roles
            resource_access = user_info.get("resource_access", {})
            for client in resource_access.values():
                client_roles = set(client.get("roles", []))
                if role_names.intersection(client_roles):
                    return True

            return False
        except Exception as e:
            logger.debug(f"Role check failed: {e!s}")
            return False

    @override
    def has_all_roles(self, token: str, role_names: set[str]) -> bool:
        """Check if a user has all of the specified roles.

        Args:
            token: Access token
            role_names: Set of role names to check

        Returns:
            True if user has all of the roles, False otherwise
        """
        try:
            user_info = self.get_userinfo(token)

            # Get all user roles
            all_roles = set()

            # Add realm roles
            realm_access = user_info.get("realm_access", {})
            all_roles.update(realm_access.get("roles", []))

            # Add client roles
            resource_access = user_info.get("resource_access", {})
            for client in resource_access.values():
                all_roles.update(client.get("roles", []))

            # Check if all required roles are present
            return role_names.issubset(all_roles)
        except Exception as e:
            logger.debug(f"All roles check failed: {e!s}")
            return False

    @override
    def create_user(self, user_data: dict[str, Any]) -> str:
        """Create a new user in Keycloak.

        Args:
            user_data: User data including username, email, etc.

        Returns:
            ID of the created user

        Raises:
            ValueError: If creating user fails
        """
        # This is a write operation, no caching needed
        try:
            user_id = self.admin_adapter.create_user(user_data)

            # Clear related caches
            self.clear_all_caches()

            return user_id
        except KeycloakError as e:
            logger.error(f"Failed to create user: {e!s}")
            raise ValueError(f"Failed to create user: {e!s}")

    @override
    def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
        """Update user details.

        Args:
            user_id: User's ID
            user_data: User data to update

        Raises:
            ValueError: If updating user fails
        """
        # This is a write operation, no caching needed
        try:
            self.admin_adapter.update_user(user_id, user_data)

            # Clear user-related caches
            self.clear_all_caches()

        except KeycloakError as e:
            logger.error(f"Failed to update user: {e!s}")
            raise ValueError(f"Failed to update user: {e!s}")

    @override
    def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
        """Reset a user's password.

        Args:
            user_id: User's ID
            password: New password
            temporary: Whether the password is temporary and should be changed on next login

        Raises:
            ValueError: If password reset fails
        """
        # This is a write operation, no caching needed
        try:
            self.admin_adapter.set_user_password(user_id, password, temporary)
        except KeycloakError as e:
            logger.error(f"Failed to reset password: {e!s}")
            raise ValueError(f"Failed to reset password: {e!s}")

    @override
    def assign_realm_role(self, user_id: str, role_name: str) -> None:
        """Assign a realm role to a user.

        Args:
            user_id: User's ID
            role_name: Role name to assign

        Raises:
            ValueError: If role assignment fails
        """
        # This is a write operation, no caching needed
        try:
            # Get role representation
            role = self.admin_adapter.get_realm_role(role_name)
            # Assign role to user
            self.admin_adapter.assign_realm_roles(user_id, [role])

            # Clear role-related caches
            if hasattr(self.get_user_roles, "clear_cache"):
                self.get_user_roles.clear_cache()

        except KeycloakError as e:
            logger.error(f"Failed to assign realm role: {e!s}")
            raise ValueError(f"Failed to assign realm role: {e!s}")

    @override
    def remove_realm_role(self, user_id: str, role_name: str) -> None:
        """Remove a realm role from a user.

        Args:
            user_id: User's ID
            role_name: Role name to remove

        Raises:
            ValueError: If role removal fails
        """
        # This is a write operation, no caching needed
        try:
            # Get role representation
            role = self.admin_adapter.get_realm_role(role_name)
            # Remove role from user
            self.admin_adapter.delete_realm_roles_of_user(user_id, [role])

            # Clear role-related caches
            if hasattr(self.get_user_roles, "clear_cache"):
                self.get_user_roles.clear_cache()

        except KeycloakError as e:
            logger.error(f"Failed to remove realm role: {e!s}")
            raise ValueError(f"Failed to remove realm role: {e!s}")

    @override
    def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Assign a client-specific role to a user.

        Args:
            user_id: User's ID
            client_id: Client ID
            role_name: Role name to assign

        Raises:
            ValueError: If role assignment fails
        """
        # This is a write operation, no caching needed
        try:
            # Get client
            client = self.admin_adapter.get_client_id(client_id)
            # Get role representation
            role = self.admin_adapter.get_client_role(client, role_name)
            # Assign role to user
            self.admin_adapter.assign_client_role(user_id, client, [role])

            # Clear role-related caches
            if hasattr(self.get_client_roles_for_user, "clear_cache"):
                self.get_client_roles_for_user.clear_cache()

        except KeycloakError as e:
            logger.error(f"Failed to assign client role: {e!s}")
            raise ValueError(f"Failed to assign client role: {e!s}")

    @override
    def create_realm_role(self, role_name: str, description: str | None = None) -> dict[str, Any]:
        """Create a new realm role.

        Args:
            role_name: Role name
            description: Optional role description

        Returns:
            Created role details

        Raises:
            ValueError: If role creation fails
        """
        # This is a write operation, no caching needed
        try:
            role_data = {"name": role_name}
            if description:
                role_data["description"] = description

            self.admin_adapter.create_realm_role(role_data)

            # Clear realm roles cache
            if hasattr(self.get_realm_roles, "clear_cache"):
                self.get_realm_roles.clear_cache()

            created_role = self.admin_adapter.get_realm_role(role_name)
            return created_role
        except KeycloakError as e:
            logger.error(f"Failed to create realm role: {e!s}")
            raise ValueError(f"Failed to create realm role: {e!s}")

    @override
    def delete_realm_role(self, role_name: str) -> None:
        """Delete a realm role.

        Args:
            role_name: Role name to delete

        Raises:
            ValueError: If role deletion fails
        """
        # This is a write operation, no caching needed
        try:
            self.admin_adapter.delete_realm_role(role_name)

            # Clear realm roles cache
            if hasattr(self.get_realm_roles, "clear_cache"):
                self.get_realm_roles.clear_cache()

            # We also need to clear user role caches since they might contain this role
            if hasattr(self.get_user_roles, "clear_cache"):
                self.get_user_roles.clear_cache()

        except KeycloakError as e:
            logger.error(f"Failed to delete realm role: {e!s}")
            raise ValueError(f"Failed to delete realm role: {e!s}")

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
    def get_service_account_id(self) -> str:
        """Get service account user ID for the current client.

        Returns:
            Service account user ID

        Raises:
            ValueError: If getting service account fails
        """
        try:
            client_id = self.get_client_id(self.configs.CLIENT_ID)
            service_account_id = self.admin_adapter.get_client_service_account_user(client_id).get("id")
            return service_account_id
        except KeycloakError as e:
            logger.error(f"Failed to get service account ID: {e!s}")
            raise ValueError(f"Failed to get service account ID: {e!s}")

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
    def get_well_known_config(self) -> dict[str, Any]:
        """Get the well-known OpenID configuration.

        Returns:
            OIDC configuration

        Raises:
            ValueError: If getting configuration fails
        """
        try:
            return self.openid_adapter.well_known()
        except KeycloakError as e:
            logger.error(f"Failed to get well-known config: {e!s}")
            raise ValueError(f"Failed to get well-known config: {e!s}")

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
    def get_certs(self) -> dict[str, Any]:
        """Get the JWT verification certificates.

        Returns:
            Certificate information

        Raises:
            ValueError: If getting certificates fails
        """
        try:
            return self.openid_adapter.certs()
        except KeycloakError as e:
            logger.error(f"Failed to get certificates: {e!s}")
            raise ValueError(f"Failed to get certificates: {e!s}")

    @override
    def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType:
        """Exchange authorization code for token.

        Args:
            code: Authorization code
            redirect_uri: Redirect URI used in authorization request

        Returns:
            Token response

        Raises:
            ValueError: If token exchange fails
        """
        # Authorization codes can only be used once, don't cache
        try:
            return self.openid_adapter.token(grant_type="authorization_code", code=code, redirect_uri=redirect_uri)
        except KeycloakError as e:
            logger.error(f"Failed to exchange code for token: {e!s}")
            raise ValueError(f"Failed to exchange code for token: {e!s}")

    @override
    def get_client_credentials_token(self) -> KeycloakTokenType:
        """Get token using client credentials.

        Returns:
            Token response

        Raises:
            ValueError: If token acquisition fails
        """
        # Tokens are time-sensitive, don't cache
        try:
            return self.openid_adapter.token(grant_type="client_credentials")
        except KeycloakError as e:
            logger.error(f"Failed to get client credentials token: {e!s}")
            raise ValueError(f"Failed to get client credentials token: {e!s}")

    @override
    @ttl_cache_decorator(ttl_seconds=30, maxsize=50)  # Cache for 30 seconds with limited entries
    def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
        """Search for users by username, email, or name.

        Args:
            query: Search query
            max_results: Maximum number of results to return

        Returns:
            List of matching users

        Raises:
            ValueError: If search fails
        """
        try:
            # Try searching by different fields
            users = []

            # Search by username
            users.extend(self.admin_adapter.get_users({"username": query, "max": max_results}))

            # Search by email if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                email_users = self.admin_adapter.get_users({"email": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in email_users if user["id"] not in user_ids])

            # Search by firstName if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                first_name_users = self.admin_adapter.get_users({"firstName": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in first_name_users if user["id"] not in user_ids])

            # Search by lastName if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                last_name_users = self.admin_adapter.get_users({"lastName": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in last_name_users if user["id"] not in user_ids])

            return users[:max_results]
        except KeycloakError as e:
            logger.error(f"Failed to search users: {e!s}")
            raise ValueError(f"Failed to search users: {e!s}")

    @override
    def check_permissions(self, token: str, resource: str, scope: str) -> bool:
        """Check if a user has permission to access a resource with the specified scope.

        Args:
            token: Access token
            resource: Resource name
            scope: Permission scope

        Returns:
            True if permission granted, False otherwise
        """
        try:
            # Use UMA permissions endpoint to check specific resource and scope
            permissions = self.openid_adapter.uma_permissions(token, permissions=[f"{resource}#{scope}"])

            # Check if the response indicates permission is granted
            if not permissions or not isinstance(permissions, list):
                logger.debug("No permissions returned or invalid response format")
                return False

            # Look for the specific permission in the response
            for perm in permissions:
                if perm.get("rsname") == resource and scope in perm.get("scopes", []):
                    return True

            return False
        except KeycloakError as e:
            logger.debug(f"Permission check failed with Keycloak error: {e!s}")
            return False
        except Exception as e:
            logger.error(f"Unexpected error in permission check: {e!s}")
            return False

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=50)  # Cache for 1 hour
    def get_client_secret(self, client_id: str) -> str:
        """Get client secret.

        Args:
            client_id: Client ID

        Returns:
            Client secret

        Raises:
            ValueError: If getting secret fails
        """
        try:
            client = self.admin_adapter.get_client(client_id)
            return client.get("secret", "")
        except KeycloakError as e:
            logger.error(f"Failed to get client secret: {e!s}")
            raise ValueError(f"Failed to get client secret: {e!s}")

    @override
    @ttl_cache_decorator(ttl_seconds=3600, maxsize=50)  # Cache for 1 hour
    def get_client_id(self, client_name: str) -> str:
        """Get client ID by client name.

        Args:
            client_name: Name of the client

        Returns:
            Client ID

        Raises:
            ValueError: If client not found
        """
        try:
            return self.admin_adapter.get_client_id(client_name)
        except KeycloakError as e:
            logger.error(f"Failed to get client ID: {e!s}")
            raise ValueError(f"Failed to get client ID: {e!s}")

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=1)  # Cache for 5 minutes
    def get_realm_roles(self) -> list[dict[str, Any]]:
        """Get all realm roles.

        Returns:
            List of realm roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return self.admin_adapter.get_realm_roles()
        except KeycloakError as e:
            logger.error(f"Failed to get realm roles: {e!s}")
            raise ValueError(f"Failed to get realm roles: {e!s}")

    @override
    @ttl_cache_decorator(ttl_seconds=300, maxsize=1)  # Cache for 5 minutes
    def get_realm_role(self, role_name: str) -> dict:
        """Get realm role.

        Args:
            role_name: Role name
        Returns:
            A realm role

        Raises:
            ValueError: If getting role fails
        """
        try:
            return self.admin_adapter.get_realm_role(role_name)
        except KeycloakError as e:
            logger.error(f"Failed to get realm role: {e!s}")
            raise ValueError(f"Failed to get realm role: {e!s}")

    @override
    def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Remove a client-specific role from a user.

        Args:
            user_id: User's ID
            client_id: Client ID
            role_name: Role name to remove

        Raises:
            ValueError: If role removal fails
        """
        try:
            client = self.admin_adapter.get_client_id(client_id)
            role = self.admin_adapter.get_client_role(client, role_name)
            self.admin_adapter.delete_client_roles_of_user(user_id, client, [role])

            if hasattr(self.get_client_roles_for_user, "clear_cache"):
                self.get_client_roles_for_user.clear_cache()
        except KeycloakError as e:
            logger.error(f"Failed to remove client role: {e!s}")
            raise ValueError(f"Failed to remove client role: {e!s}")

    @override
    def clear_user_sessions(self, user_id: str) -> None:
        """Clear all sessions for a user.

        Args:
            user_id: User's ID

        Raises:
            ValueError: If clearing sessions fails
        """
        try:
            self.admin_adapter.user_logout(user_id)
        except KeycloakError as e:
            logger.error(f"Failed to clear user sessions: {e!s}")
            raise ValueError(f"Failed to clear user sessions: {e!s}")

    @override
    def logout(self, refresh_token: str) -> None:
        """Logout user by invalidating their refresh token.

        Args:
            refresh_token: Refresh token to invalidate

        Raises:
            ValueError: If logout fails
        """
        try:
            self.openid_adapter.logout(refresh_token)
        except KeycloakError as e:
            logger.error(f"Failed to logout: {e!s}")
            raise ValueError(f"Failed to logout: {e!s}")

    @override
    def introspect_token(self, token: str) -> dict[str, Any]:
        """Introspect token to get detailed information about it.

        Args:
            token: Access token

        Returns:
            Token introspection details

        Raises:
            ValueError: If token introspection fails
        """
        try:
            return self.openid_adapter.introspect(token)
        except KeycloakError as e:
            logger.error(f"Failed to introspect token: {e!s}")
            raise ValueError(f"Failed to introspect token: {e!s}")

    @override
    def get_token_info(self, token: str) -> dict[str, Any]:
        """Decode token to get its claims.

        Args:
            token: Access token

        Returns:
            Dictionary of token claims

        Raises:
            ValueError: If token decoding fails
        """
        try:
            return self.openid_adapter.decode_token(
                token,
                key=self.get_public_key(),
            )
        except KeycloakError as e:
            logger.error(f"Failed to get token info: {e!s}")
            raise ValueError(f"Failed to get token info: {e!s}")

    @override
    def delete_user(self, user_id: str) -> None:
        """Delete a user from Keycloak by their ID.

        Args:
            user_id: The ID of the user to delete

        Raises:
            ValueError: If the deletion fails
        """
        try:
            self.admin_adapter.delete_user(user_id=user_id)
            logger.info(f"Successfully deleted user with ID {user_id}")
        except Exception as e:
            logger.error(f"Failed to delete user with ID {user_id}: {e!s}")
            raise ValueError(f"Failed to delete user: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.admin_adapter property

Get the admin adapter, refreshing it if necessary.

Returns:

Type Description
KeycloakAdmin

KeycloakAdmin instance

Raises:

Type Description
ValueError

If admin client is not available

archipy.adapters.keycloak.adapters.KeycloakAdapter.__init__(keycloak_configs=None)

Initialize KeycloakAdapter with configuration.

Parameters:

Name Type Description Default
keycloak_configs KeycloakConfig | None

Optional Keycloak configuration. If None, global config is used.

None
Source code in archipy/adapters/keycloak/adapters.py
def __init__(self, keycloak_configs: KeycloakConfig | None = None) -> None:
    """Initialize KeycloakAdapter with configuration.

    Args:
        keycloak_configs: Optional Keycloak configuration. If None, global config is used.
    """
    self.configs: KeycloakConfig = (
        BaseConfig.global_config().KEYCLOAK if keycloak_configs is None else keycloak_configs
    )

    # Initialize the OpenID client for authentication
    self.openid_adapter = self._get_openid_client(self.configs)

    # Cache for admin client to avoid unnecessary re-authentication
    self._admin_adapter = None
    self._admin_token_expiry = 0

    # Initialize admin client with service account if client_secret is provided
    if self.configs.CLIENT_SECRET_KEY:
        self._initialize_admin_client()

archipy.adapters.keycloak.adapters.KeycloakAdapter.clear_all_caches()

Clear all cached values.

Source code in archipy/adapters/keycloak/adapters.py
def clear_all_caches(self) -> None:
    """Clear all cached values."""
    for attr_name in dir(self):
        attr = getattr(self, attr_name)
        if hasattr(attr, "clear_cache"):
            attr.clear_cache()

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_public_key()

Get the public key used to verify tokens.

Returns:

Type Description
Any

JWK key object used to verify signatures

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour, public key rarely changes
def get_public_key(self) -> Any:
    """Get the public key used to verify tokens.

    Returns:
        JWK key object used to verify signatures
    """
    try:
        from jwcrypto import jwk

        keys_info = self.openid_adapter.public_key()
        key = f"-----BEGIN PUBLIC KEY-----\n{keys_info}\n-----END PUBLIC KEY-----"
        return jwk.JWK.from_pem(key.encode("utf-8"))
    except Exception as e:
        logger.error(f"Failed to get public key: {e!s}")
        raise ValueError(f"Failed to get public key: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_token(username, password)

Get a user token by username and password using the Resource Owner Password Credentials Grant.

Warning

This method uses the direct password grant flow, which is less secure and not recommended for user login in production environments. Instead, prefer the web-based OAuth 2.0 Authorization Code Flow (use get_token_from_code) for secure authentication. Use this method only for testing, administrative tasks, or specific service accounts where direct credential use is acceptable and properly secured.

Parameters:

Name Type Description Default
username str

User's username

required
password str

User's password

required

Returns:

Type Description
KeycloakTokenType

Token response containing access_token, refresh_token, etc.

Raises:

Type Description
ValueError

If token acquisition fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_token(self, username: str, password: str) -> KeycloakTokenType:
    """Get a user token by username and password using the Resource Owner Password Credentials Grant.

    Warning:
        This method uses the direct password grant flow, which is less secure and not recommended
        for user login in production environments. Instead, prefer the web-based OAuth 2.0
        Authorization Code Flow (use `get_token_from_code`) for secure authentication.
        Use this method only for testing, administrative tasks, or specific service accounts
        where direct credential use is acceptable and properly secured.

    Args:
        username: User's username
        password: User's password

    Returns:
        Token response containing access_token, refresh_token, etc.

    Raises:
        ValueError: If token acquisition fails
    """
    try:
        return self.openid_adapter.token(grant_type="password", username=username, password=password)
    except KeycloakError as e:
        logger.error(f"Failed to get token: {e!s}")
        raise ValueError(f"Failed to get token: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.refresh_token(refresh_token)

Refresh an existing token using a refresh token.

Parameters:

Name Type Description Default
refresh_token str

Refresh token string

required

Returns:

Type Description
KeycloakTokenType

New token response containing access_token, refresh_token, etc.

Raises:

Type Description
ValueError

If token refresh fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def refresh_token(self, refresh_token: str) -> KeycloakTokenType:
    """Refresh an existing token using a refresh token.

    Args:
        refresh_token: Refresh token string

    Returns:
        New token response containing access_token, refresh_token, etc.

    Raises:
        ValueError: If token refresh fails
    """
    try:
        return self.openid_adapter.refresh_token(refresh_token)
    except KeycloakError as e:
        logger.error(f"Failed to refresh token: {e!s}")
        raise ValueError(f"Failed to refresh token: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.validate_token(token)

Validate if a token is still valid.

Parameters:

Name Type Description Default
token str

Access token to validate

required

Returns:

Type Description
bool

True if token is valid, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
def validate_token(self, token: str) -> bool:
    """Validate if a token is still valid.

    Args:
        token: Access token to validate

    Returns:
        True if token is valid, False otherwise
    """
    # Not caching validation results as tokens are time-sensitive
    try:
        self.openid_adapter.decode_token(
            token,
            key=self.get_public_key(),
        )
        return True
    except Exception as e:
        logger.debug(f"Token validation failed: {e!s}")
        return False

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_userinfo(token)

Get user information from a token.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
KeycloakUserType

User information

Raises:

Type Description
ValueError

If getting user info fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_userinfo(self, token: str) -> KeycloakUserType:
    """Get user information from a token.

    Args:
        token: Access token

    Returns:
        User information

    Raises:
        ValueError: If getting user info fails
    """
    if not self.validate_token(token):
        logger.error("Invalid token provided for userinfo request")
        raise ValueError("Invalid token provided")
    try:
        return self._get_userinfo_cached(token)
    except KeycloakError as e:
        logger.error(f"Failed to get user info: {e!s}")
        raise ValueError(f"Failed to get user info: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_user_by_id(user_id)

Get user details by user ID.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If getting user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
    """Get user details by user ID.

    Args:
        user_id: User's ID

    Returns:
        User details or None if not found

    Raises:
        ValueError: If getting user fails
    """
    try:
        return self.admin_adapter.get_user(user_id)
    except KeycloakGetError as e:
        if e.response_code == 404:
            return None
        logger.error(f"Failed to get user by ID: {e!s}")
        raise ValueError(f"Failed to get user by ID: {e!s}")
    except KeycloakError as e:
        logger.error(f"Failed to get user by ID: {e!s}")
        raise ValueError(f"Failed to get user by ID: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_user_by_username(username)

Get user details by username.

Parameters:

Name Type Description Default
username str

User's username

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If query fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
def get_user_by_username(self, username: str) -> KeycloakUserType | None:
    """Get user details by username.

    Args:
        username: User's username

    Returns:
        User details or None if not found

    Raises:
        ValueError: If query fails
    """
    try:
        users = self.admin_adapter.get_users({"username": username})
        return users[0] if users else None
    except KeycloakError as e:
        logger.error(f"Failed to get user by username: {e!s}")
        raise ValueError(f"Failed to get user by username: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_user_by_email(email)

Get user details by email.

Parameters:

Name Type Description Default
email str

User's email

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If query fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
def get_user_by_email(self, email: str) -> KeycloakUserType | None:
    """Get user details by email.

    Args:
        email: User's email

    Returns:
        User details or None if not found

    Raises:
        ValueError: If query fails
    """
    try:
        users = self.admin_adapter.get_users({"email": email})
        return users[0] if users else None
    except KeycloakError as e:
        logger.error(f"Failed to get user by email: {e!s}")
        raise ValueError(f"Failed to get user by email: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_user_roles(user_id)

Get roles assigned to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Returns:

Type Description
list[KeycloakRoleType]

List of roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
    """Get roles assigned to a user.

    Args:
        user_id: User's ID

    Returns:
        List of roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return self.admin_adapter.get_realm_roles_of_user(user_id)
    except KeycloakError as e:
        logger.error(f"Failed to get user roles: {e!s}")
        raise ValueError(f"Failed to get user roles: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_client_roles_for_user(user_id, client_id)

Get client-specific roles assigned to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required

Returns:

Type Description
list[KeycloakRoleType]

List of client-specific roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=100)  # Cache for 5 minutes
def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
    """Get client-specific roles assigned to a user.

    Args:
        user_id: User's ID
        client_id: Client ID

    Returns:
        List of client-specific roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return self.admin_adapter.get_client_roles_of_user(user_id, client_id)
    except KeycloakError as e:
        logger.error(f"Failed to get client roles: {e!s}")
        raise ValueError(f"Failed to get client roles: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.has_role(token, role_name)

Check if a user has a specific role.

Parameters:

Name Type Description Default
token str

Access token

required
role_name str

Role name to check

required

Returns:

Type Description
bool

True if user has the role, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
def has_role(self, token: str, role_name: str) -> bool:
    """Check if a user has a specific role.

    Args:
        token: Access token
        role_name: Role name to check

    Returns:
        True if user has the role, False otherwise
    """
    # Not caching this result as token validation is time-sensitive
    try:
        user_info = self.get_userinfo(token)

        # Check realm roles
        realm_access = user_info.get("realm_access", {})
        roles = realm_access.get("roles", [])
        if role_name in roles:
            return True

        # Check client roles
        resource_access = user_info.get("resource_access", {})
        for client in resource_access.values():
            client_roles = client.get("roles", [])
            if role_name in client_roles:
                return True

        return False
    except Exception as e:
        logger.debug(f"Role check failed: {e!s}")
        return False

archipy.adapters.keycloak.adapters.KeycloakAdapter.has_any_of_roles(token, role_names)

Check if a user has any of the specified roles.

Parameters:

Name Type Description Default
token str

Access token

required
role_names set[str]

Set of role names to check

required

Returns:

Type Description
bool

True if user has any of the roles, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
def has_any_of_roles(self, token: str, role_names: set[str]) -> bool:
    """Check if a user has any of the specified roles.

    Args:
        token: Access token
        role_names: Set of role names to check

    Returns:
        True if user has any of the roles, False otherwise
    """
    try:
        user_info = self.get_userinfo(token)

        # Check realm roles
        realm_access = user_info.get("realm_access", {})
        roles = set(realm_access.get("roles", []))
        if role_names.intersection(roles):
            return True

        # Check client roles
        resource_access = user_info.get("resource_access", {})
        for client in resource_access.values():
            client_roles = set(client.get("roles", []))
            if role_names.intersection(client_roles):
                return True

        return False
    except Exception as e:
        logger.debug(f"Role check failed: {e!s}")
        return False

archipy.adapters.keycloak.adapters.KeycloakAdapter.has_all_roles(token, role_names)

Check if a user has all of the specified roles.

Parameters:

Name Type Description Default
token str

Access token

required
role_names set[str]

Set of role names to check

required

Returns:

Type Description
bool

True if user has all of the roles, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
def has_all_roles(self, token: str, role_names: set[str]) -> bool:
    """Check if a user has all of the specified roles.

    Args:
        token: Access token
        role_names: Set of role names to check

    Returns:
        True if user has all of the roles, False otherwise
    """
    try:
        user_info = self.get_userinfo(token)

        # Get all user roles
        all_roles = set()

        # Add realm roles
        realm_access = user_info.get("realm_access", {})
        all_roles.update(realm_access.get("roles", []))

        # Add client roles
        resource_access = user_info.get("resource_access", {})
        for client in resource_access.values():
            all_roles.update(client.get("roles", []))

        # Check if all required roles are present
        return role_names.issubset(all_roles)
    except Exception as e:
        logger.debug(f"All roles check failed: {e!s}")
        return False

archipy.adapters.keycloak.adapters.KeycloakAdapter.create_user(user_data)

Create a new user in Keycloak.

Parameters:

Name Type Description Default
user_data dict[str, Any]

User data including username, email, etc.

required

Returns:

Type Description
str

ID of the created user

Raises:

Type Description
ValueError

If creating user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def create_user(self, user_data: dict[str, Any]) -> str:
    """Create a new user in Keycloak.

    Args:
        user_data: User data including username, email, etc.

    Returns:
        ID of the created user

    Raises:
        ValueError: If creating user fails
    """
    # This is a write operation, no caching needed
    try:
        user_id = self.admin_adapter.create_user(user_data)

        # Clear related caches
        self.clear_all_caches()

        return user_id
    except KeycloakError as e:
        logger.error(f"Failed to create user: {e!s}")
        raise ValueError(f"Failed to create user: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.update_user(user_id, user_data)

Update user details.

Parameters:

Name Type Description Default
user_id str

User's ID

required
user_data dict[str, Any]

User data to update

required

Raises:

Type Description
ValueError

If updating user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
    """Update user details.

    Args:
        user_id: User's ID
        user_data: User data to update

    Raises:
        ValueError: If updating user fails
    """
    # This is a write operation, no caching needed
    try:
        self.admin_adapter.update_user(user_id, user_data)

        # Clear user-related caches
        self.clear_all_caches()

    except KeycloakError as e:
        logger.error(f"Failed to update user: {e!s}")
        raise ValueError(f"Failed to update user: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.reset_password(user_id, password, temporary=False)

Reset a user's password.

Parameters:

Name Type Description Default
user_id str

User's ID

required
password str

New password

required
temporary bool

Whether the password is temporary and should be changed on next login

False

Raises:

Type Description
ValueError

If password reset fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
    """Reset a user's password.

    Args:
        user_id: User's ID
        password: New password
        temporary: Whether the password is temporary and should be changed on next login

    Raises:
        ValueError: If password reset fails
    """
    # This is a write operation, no caching needed
    try:
        self.admin_adapter.set_user_password(user_id, password, temporary)
    except KeycloakError as e:
        logger.error(f"Failed to reset password: {e!s}")
        raise ValueError(f"Failed to reset password: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.assign_realm_role(user_id, role_name)

Assign a realm role to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
role_name str

Role name to assign

required

Raises:

Type Description
ValueError

If role assignment fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def assign_realm_role(self, user_id: str, role_name: str) -> None:
    """Assign a realm role to a user.

    Args:
        user_id: User's ID
        role_name: Role name to assign

    Raises:
        ValueError: If role assignment fails
    """
    # This is a write operation, no caching needed
    try:
        # Get role representation
        role = self.admin_adapter.get_realm_role(role_name)
        # Assign role to user
        self.admin_adapter.assign_realm_roles(user_id, [role])

        # Clear role-related caches
        if hasattr(self.get_user_roles, "clear_cache"):
            self.get_user_roles.clear_cache()

    except KeycloakError as e:
        logger.error(f"Failed to assign realm role: {e!s}")
        raise ValueError(f"Failed to assign realm role: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.remove_realm_role(user_id, role_name)

Remove a realm role from a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
role_name str

Role name to remove

required

Raises:

Type Description
ValueError

If role removal fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def remove_realm_role(self, user_id: str, role_name: str) -> None:
    """Remove a realm role from a user.

    Args:
        user_id: User's ID
        role_name: Role name to remove

    Raises:
        ValueError: If role removal fails
    """
    # This is a write operation, no caching needed
    try:
        # Get role representation
        role = self.admin_adapter.get_realm_role(role_name)
        # Remove role from user
        self.admin_adapter.delete_realm_roles_of_user(user_id, [role])

        # Clear role-related caches
        if hasattr(self.get_user_roles, "clear_cache"):
            self.get_user_roles.clear_cache()

    except KeycloakError as e:
        logger.error(f"Failed to remove realm role: {e!s}")
        raise ValueError(f"Failed to remove realm role: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.assign_client_role(user_id, client_id, role_name)

Assign a client-specific role to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required
role_name str

Role name to assign

required

Raises:

Type Description
ValueError

If role assignment fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Assign a client-specific role to a user.

    Args:
        user_id: User's ID
        client_id: Client ID
        role_name: Role name to assign

    Raises:
        ValueError: If role assignment fails
    """
    # This is a write operation, no caching needed
    try:
        # Get client
        client = self.admin_adapter.get_client_id(client_id)
        # Get role representation
        role = self.admin_adapter.get_client_role(client, role_name)
        # Assign role to user
        self.admin_adapter.assign_client_role(user_id, client, [role])

        # Clear role-related caches
        if hasattr(self.get_client_roles_for_user, "clear_cache"):
            self.get_client_roles_for_user.clear_cache()

    except KeycloakError as e:
        logger.error(f"Failed to assign client role: {e!s}")
        raise ValueError(f"Failed to assign client role: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.create_realm_role(role_name, description=None)

Create a new realm role.

Parameters:

Name Type Description Default
role_name str

Role name

required
description str | None

Optional role description

None

Returns:

Type Description
dict[str, Any]

Created role details

Raises:

Type Description
ValueError

If role creation fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def create_realm_role(self, role_name: str, description: str | None = None) -> dict[str, Any]:
    """Create a new realm role.

    Args:
        role_name: Role name
        description: Optional role description

    Returns:
        Created role details

    Raises:
        ValueError: If role creation fails
    """
    # This is a write operation, no caching needed
    try:
        role_data = {"name": role_name}
        if description:
            role_data["description"] = description

        self.admin_adapter.create_realm_role(role_data)

        # Clear realm roles cache
        if hasattr(self.get_realm_roles, "clear_cache"):
            self.get_realm_roles.clear_cache()

        created_role = self.admin_adapter.get_realm_role(role_name)
        return created_role
    except KeycloakError as e:
        logger.error(f"Failed to create realm role: {e!s}")
        raise ValueError(f"Failed to create realm role: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.delete_realm_role(role_name)

Delete a realm role.

Parameters:

Name Type Description Default
role_name str

Role name to delete

required

Raises:

Type Description
ValueError

If role deletion fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def delete_realm_role(self, role_name: str) -> None:
    """Delete a realm role.

    Args:
        role_name: Role name to delete

    Raises:
        ValueError: If role deletion fails
    """
    # This is a write operation, no caching needed
    try:
        self.admin_adapter.delete_realm_role(role_name)

        # Clear realm roles cache
        if hasattr(self.get_realm_roles, "clear_cache"):
            self.get_realm_roles.clear_cache()

        # We also need to clear user role caches since they might contain this role
        if hasattr(self.get_user_roles, "clear_cache"):
            self.get_user_roles.clear_cache()

    except KeycloakError as e:
        logger.error(f"Failed to delete realm role: {e!s}")
        raise ValueError(f"Failed to delete realm role: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_service_account_id()

Get service account user ID for the current client.

Returns:

Type Description
str

Service account user ID

Raises:

Type Description
ValueError

If getting service account fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
def get_service_account_id(self) -> str:
    """Get service account user ID for the current client.

    Returns:
        Service account user ID

    Raises:
        ValueError: If getting service account fails
    """
    try:
        client_id = self.get_client_id(self.configs.CLIENT_ID)
        service_account_id = self.admin_adapter.get_client_service_account_user(client_id).get("id")
        return service_account_id
    except KeycloakError as e:
        logger.error(f"Failed to get service account ID: {e!s}")
        raise ValueError(f"Failed to get service account ID: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_well_known_config()

Get the well-known OpenID configuration.

Returns:

Type Description
dict[str, Any]

OIDC configuration

Raises:

Type Description
ValueError

If getting configuration fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
def get_well_known_config(self) -> dict[str, Any]:
    """Get the well-known OpenID configuration.

    Returns:
        OIDC configuration

    Raises:
        ValueError: If getting configuration fails
    """
    try:
        return self.openid_adapter.well_known()
    except KeycloakError as e:
        logger.error(f"Failed to get well-known config: {e!s}")
        raise ValueError(f"Failed to get well-known config: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_certs()

Get the JWT verification certificates.

Returns:

Type Description
dict[str, Any]

Certificate information

Raises:

Type Description
ValueError

If getting certificates fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=1)  # Cache for 1 hour
def get_certs(self) -> dict[str, Any]:
    """Get the JWT verification certificates.

    Returns:
        Certificate information

    Raises:
        ValueError: If getting certificates fails
    """
    try:
        return self.openid_adapter.certs()
    except KeycloakError as e:
        logger.error(f"Failed to get certificates: {e!s}")
        raise ValueError(f"Failed to get certificates: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_token_from_code(code, redirect_uri)

Exchange authorization code for token.

Parameters:

Name Type Description Default
code str

Authorization code

required
redirect_uri str

Redirect URI used in authorization request

required

Returns:

Type Description
KeycloakTokenType

Token response

Raises:

Type Description
ValueError

If token exchange fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType:
    """Exchange authorization code for token.

    Args:
        code: Authorization code
        redirect_uri: Redirect URI used in authorization request

    Returns:
        Token response

    Raises:
        ValueError: If token exchange fails
    """
    # Authorization codes can only be used once, don't cache
    try:
        return self.openid_adapter.token(grant_type="authorization_code", code=code, redirect_uri=redirect_uri)
    except KeycloakError as e:
        logger.error(f"Failed to exchange code for token: {e!s}")
        raise ValueError(f"Failed to exchange code for token: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_client_credentials_token()

Get token using client credentials.

Returns:

Type Description
KeycloakTokenType

Token response

Raises:

Type Description
ValueError

If token acquisition fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_client_credentials_token(self) -> KeycloakTokenType:
    """Get token using client credentials.

    Returns:
        Token response

    Raises:
        ValueError: If token acquisition fails
    """
    # Tokens are time-sensitive, don't cache
    try:
        return self.openid_adapter.token(grant_type="client_credentials")
    except KeycloakError as e:
        logger.error(f"Failed to get client credentials token: {e!s}")
        raise ValueError(f"Failed to get client credentials token: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.search_users(query, max_results=100)

Search for users by username, email, or name.

Parameters:

Name Type Description Default
query str

Search query

required
max_results int

Maximum number of results to return

100

Returns:

Type Description
list[KeycloakUserType]

List of matching users

Raises:

Type Description
ValueError

If search fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=30, maxsize=50)  # Cache for 30 seconds with limited entries
def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
    """Search for users by username, email, or name.

    Args:
        query: Search query
        max_results: Maximum number of results to return

    Returns:
        List of matching users

    Raises:
        ValueError: If search fails
    """
    try:
        # Try searching by different fields
        users = []

        # Search by username
        users.extend(self.admin_adapter.get_users({"username": query, "max": max_results}))

        # Search by email if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            email_users = self.admin_adapter.get_users({"email": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in email_users if user["id"] not in user_ids])

        # Search by firstName if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            first_name_users = self.admin_adapter.get_users({"firstName": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in first_name_users if user["id"] not in user_ids])

        # Search by lastName if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            last_name_users = self.admin_adapter.get_users({"lastName": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in last_name_users if user["id"] not in user_ids])

        return users[:max_results]
    except KeycloakError as e:
        logger.error(f"Failed to search users: {e!s}")
        raise ValueError(f"Failed to search users: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.check_permissions(token, resource, scope)

Check if a user has permission to access a resource with the specified scope.

Parameters:

Name Type Description Default
token str

Access token

required
resource str

Resource name

required
scope str

Permission scope

required

Returns:

Type Description
bool

True if permission granted, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
def check_permissions(self, token: str, resource: str, scope: str) -> bool:
    """Check if a user has permission to access a resource with the specified scope.

    Args:
        token: Access token
        resource: Resource name
        scope: Permission scope

    Returns:
        True if permission granted, False otherwise
    """
    try:
        # Use UMA permissions endpoint to check specific resource and scope
        permissions = self.openid_adapter.uma_permissions(token, permissions=[f"{resource}#{scope}"])

        # Check if the response indicates permission is granted
        if not permissions or not isinstance(permissions, list):
            logger.debug("No permissions returned or invalid response format")
            return False

        # Look for the specific permission in the response
        for perm in permissions:
            if perm.get("rsname") == resource and scope in perm.get("scopes", []):
                return True

        return False
    except KeycloakError as e:
        logger.debug(f"Permission check failed with Keycloak error: {e!s}")
        return False
    except Exception as e:
        logger.error(f"Unexpected error in permission check: {e!s}")
        return False

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_client_secret(client_id)

Get client secret.

Parameters:

Name Type Description Default
client_id str

Client ID

required

Returns:

Type Description
str

Client secret

Raises:

Type Description
ValueError

If getting secret fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=50)  # Cache for 1 hour
def get_client_secret(self, client_id: str) -> str:
    """Get client secret.

    Args:
        client_id: Client ID

    Returns:
        Client secret

    Raises:
        ValueError: If getting secret fails
    """
    try:
        client = self.admin_adapter.get_client(client_id)
        return client.get("secret", "")
    except KeycloakError as e:
        logger.error(f"Failed to get client secret: {e!s}")
        raise ValueError(f"Failed to get client secret: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_client_id(client_name)

Get client ID by client name.

Parameters:

Name Type Description Default
client_name str

Name of the client

required

Returns:

Type Description
str

Client ID

Raises:

Type Description
ValueError

If client not found

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=3600, maxsize=50)  # Cache for 1 hour
def get_client_id(self, client_name: str) -> str:
    """Get client ID by client name.

    Args:
        client_name: Name of the client

    Returns:
        Client ID

    Raises:
        ValueError: If client not found
    """
    try:
        return self.admin_adapter.get_client_id(client_name)
    except KeycloakError as e:
        logger.error(f"Failed to get client ID: {e!s}")
        raise ValueError(f"Failed to get client ID: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_realm_roles()

Get all realm roles.

Returns:

Type Description
list[dict[str, Any]]

List of realm roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=1)  # Cache for 5 minutes
def get_realm_roles(self) -> list[dict[str, Any]]:
    """Get all realm roles.

    Returns:
        List of realm roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return self.admin_adapter.get_realm_roles()
    except KeycloakError as e:
        logger.error(f"Failed to get realm roles: {e!s}")
        raise ValueError(f"Failed to get realm roles: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_realm_role(role_name)

Get realm role.

Parameters:

Name Type Description Default
role_name str

Role name

required

Returns: A realm role

Raises:

Type Description
ValueError

If getting role fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@ttl_cache_decorator(ttl_seconds=300, maxsize=1)  # Cache for 5 minutes
def get_realm_role(self, role_name: str) -> dict:
    """Get realm role.

    Args:
        role_name: Role name
    Returns:
        A realm role

    Raises:
        ValueError: If getting role fails
    """
    try:
        return self.admin_adapter.get_realm_role(role_name)
    except KeycloakError as e:
        logger.error(f"Failed to get realm role: {e!s}")
        raise ValueError(f"Failed to get realm role: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.remove_client_role(user_id, client_id, role_name)

Remove a client-specific role from a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required
role_name str

Role name to remove

required

Raises:

Type Description
ValueError

If role removal fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Remove a client-specific role from a user.

    Args:
        user_id: User's ID
        client_id: Client ID
        role_name: Role name to remove

    Raises:
        ValueError: If role removal fails
    """
    try:
        client = self.admin_adapter.get_client_id(client_id)
        role = self.admin_adapter.get_client_role(client, role_name)
        self.admin_adapter.delete_client_roles_of_user(user_id, client, [role])

        if hasattr(self.get_client_roles_for_user, "clear_cache"):
            self.get_client_roles_for_user.clear_cache()
    except KeycloakError as e:
        logger.error(f"Failed to remove client role: {e!s}")
        raise ValueError(f"Failed to remove client role: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.clear_user_sessions(user_id)

Clear all sessions for a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Raises:

Type Description
ValueError

If clearing sessions fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def clear_user_sessions(self, user_id: str) -> None:
    """Clear all sessions for a user.

    Args:
        user_id: User's ID

    Raises:
        ValueError: If clearing sessions fails
    """
    try:
        self.admin_adapter.user_logout(user_id)
    except KeycloakError as e:
        logger.error(f"Failed to clear user sessions: {e!s}")
        raise ValueError(f"Failed to clear user sessions: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.logout(refresh_token)

Logout user by invalidating their refresh token.

Parameters:

Name Type Description Default
refresh_token str

Refresh token to invalidate

required

Raises:

Type Description
ValueError

If logout fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def logout(self, refresh_token: str) -> None:
    """Logout user by invalidating their refresh token.

    Args:
        refresh_token: Refresh token to invalidate

    Raises:
        ValueError: If logout fails
    """
    try:
        self.openid_adapter.logout(refresh_token)
    except KeycloakError as e:
        logger.error(f"Failed to logout: {e!s}")
        raise ValueError(f"Failed to logout: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.introspect_token(token)

Introspect token to get detailed information about it.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
dict[str, Any]

Token introspection details

Raises:

Type Description
ValueError

If token introspection fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def introspect_token(self, token: str) -> dict[str, Any]:
    """Introspect token to get detailed information about it.

    Args:
        token: Access token

    Returns:
        Token introspection details

    Raises:
        ValueError: If token introspection fails
    """
    try:
        return self.openid_adapter.introspect(token)
    except KeycloakError as e:
        logger.error(f"Failed to introspect token: {e!s}")
        raise ValueError(f"Failed to introspect token: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.get_token_info(token)

Decode token to get its claims.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
dict[str, Any]

Dictionary of token claims

Raises:

Type Description
ValueError

If token decoding fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def get_token_info(self, token: str) -> dict[str, Any]:
    """Decode token to get its claims.

    Args:
        token: Access token

    Returns:
        Dictionary of token claims

    Raises:
        ValueError: If token decoding fails
    """
    try:
        return self.openid_adapter.decode_token(
            token,
            key=self.get_public_key(),
        )
    except KeycloakError as e:
        logger.error(f"Failed to get token info: {e!s}")
        raise ValueError(f"Failed to get token info: {e!s}")

archipy.adapters.keycloak.adapters.KeycloakAdapter.delete_user(user_id)

Delete a user from Keycloak by their ID.

Parameters:

Name Type Description Default
user_id str

The ID of the user to delete

required

Raises:

Type Description
ValueError

If the deletion fails

Source code in archipy/adapters/keycloak/adapters.py
@override
def delete_user(self, user_id: str) -> None:
    """Delete a user from Keycloak by their ID.

    Args:
        user_id: The ID of the user to delete

    Raises:
        ValueError: If the deletion fails
    """
    try:
        self.admin_adapter.delete_user(user_id=user_id)
        logger.info(f"Successfully deleted user with ID {user_id}")
    except Exception as e:
        logger.error(f"Failed to delete user with ID {user_id}: {e!s}")
        raise ValueError(f"Failed to delete user: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter

Bases: AsyncKeycloakPort

Concrete implementation of the KeycloakPort interface using python-keycloak library.

This implementation includes TTL caching for appropriate operations to improve performance while ensuring cache entries expire after a configured time to prevent stale data.

Source code in archipy/adapters/keycloak/adapters.py
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
class AsyncKeycloakAdapter(AsyncKeycloakPort):
    """Concrete implementation of the KeycloakPort interface using python-keycloak library.

    This implementation includes TTL caching for appropriate operations to improve performance
    while ensuring cache entries expire after a configured time to prevent stale data.
    """

    def __init__(self, keycloak_configs: KeycloakConfig | None = None) -> None:
        """Initialize KeycloakAdapter with configuration.

        Args:
            keycloak_configs: Optional Keycloak configuration. If None, global config is used.
        """
        self.configs: KeycloakConfig = (
            BaseConfig.global_config().KEYCLOAK if keycloak_configs is None else keycloak_configs
        )

        # Initialize the OpenID client for authentication
        self.openid_adapter = self._get_openid_client(self.configs)

        # Cache for admin client to avoid unnecessary re-authentication
        self._admin_adapter = None
        self._admin_token_expiry = 0

        # Initialize admin client with service account if client_secret is provided
        if self.configs.CLIENT_SECRET_KEY:
            self._initialize_admin_client()

    def clear_all_caches(self) -> None:
        """Clear all cached values."""
        for attr_name in dir(self):
            attr = getattr(self, attr_name)
            if hasattr(attr, "clear_cache"):
                attr.clear_cache()

    @staticmethod
    def _get_openid_client(configs: KeycloakConfig) -> KeycloakOpenID:
        """Create and configure a KeycloakOpenID instance.

        Args:
            configs: Keycloak configuration

        Returns:
            Configured KeycloakOpenID client
        """
        return KeycloakOpenID(
            server_url=configs.SERVER_URL,
            client_id=configs.CLIENT_ID,
            realm_name=configs.REALM_NAME,
            client_secret_key=configs.CLIENT_SECRET_KEY,
            verify=configs.VERIFY_SSL,
            timeout=configs.TIMEOUT,
        )

    def _initialize_admin_client(self) -> None:
        """Initialize or refresh the admin client."""
        try:
            # Get token using client credentials
            token = self.openid_adapter.token(grant_type="client_credentials")

            # Set token expiry time (current time + expires_in - buffer)
            # Using a 30-second buffer to ensure we refresh before expiration
            self._admin_token_expiry = time.time() + token.get("expires_in", 60) - 30

            # Create admin client with the token
            self._admin_adapter = KeycloakAdmin(
                server_url=self.configs.SERVER_URL,
                realm_name=self.configs.REALM_NAME,
                token=token,
                verify=self.configs.VERIFY_SSL,
                timeout=self.configs.TIMEOUT,
            )
            logger.debug("Admin client initialized successfully")
        except KeycloakError as e:
            logger.error(f"Failed to initialize admin client: {e!s}")
            self._admin_adapter = None
            self._admin_token_expiry = 0

    @property
    def admin_adapter(self) -> KeycloakAdmin:
        """Get the admin adapter, refreshing it if necessary.

        Returns:
            KeycloakAdmin instance

        Raises:
            ValueError: If admin client is not available
        """
        if not self.configs.CLIENT_SECRET_KEY:
            raise ValueError("Admin client not available, provide client_secret_key to use admin features")

        # Check if token is about to expire and refresh if needed
        if self._admin_adapter is None or time.time() >= self._admin_token_expiry:
            self._initialize_admin_client()

        if self._admin_adapter is None:
            raise ValueError("Failed to initialize admin client")

        return self._admin_adapter

    @override
    @alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour, public key rarely changes
    async def get_public_key(self) -> Any:
        """Get the public key used to verify tokens.

        Returns:
            JWK key object used to verify signatures
        """
        try:
            from jwcrypto import jwk

            keys_info = await self.openid_adapter.a_public_key()
            key = f"-----BEGIN PUBLIC KEY-----\n{keys_info}\n-----END PUBLIC KEY-----"
            return jwk.JWK.from_pem(key.encode("utf-8"))
        except Exception as e:
            logger.error(f"Failed to get public key: {e!s}")
            raise ValueError(f"Failed to get public key: {e!s}")

    @override
    async def get_token(self, username: str, password: str) -> KeycloakTokenType:
        """Get a user token by username and password using the Resource Owner Password Credentials Grant.

        Warning:
            This method uses the direct password grant flow, which is less secure and not recommended
            for user login in production environments. Instead, prefer the web-based OAuth 2.0
            Authorization Code Flow (use `get_token_from_code`) for secure authentication.
            Use this method only for testing, administrative tasks, or specific service accounts
            where direct credential use is acceptable and properly secured.

        Args:
            username: User's username
            password: User's password

        Returns:
            Token response containing access_token, refresh_token, etc.

        Raises:
            ValueError: If token acquisition fails
        """
        try:
            return await self.openid_adapter.a_token(grant_type="password", username=username, password=password)
        except KeycloakError as e:
            logger.error(f"Failed to get token: {e!s}")
            raise ValueError(f"Failed to get token: {e!s}")

    @override
    async def refresh_token(self, refresh_token: str) -> KeycloakTokenType:
        """Refresh an existing token using a refresh token.

        Args:
            refresh_token: Refresh token string

        Returns:
            New token response containing access_token, refresh_token, etc.

        Raises:
            ValueError: If token refresh fails
        """
        try:
            return await self.openid_adapter.a_refresh_token(refresh_token)
        except KeycloakError as e:
            logger.error(f"Failed to refresh token: {e!s}")
            raise ValueError(f"Failed to refresh token: {e!s}")

    @override
    async def validate_token(self, token: str) -> bool:
        """Validate if a token is still valid.

        Args:
            token: Access token to validate

        Returns:
            True if token is valid, False otherwise
        """
        # Not caching validation results as tokens are time-sensitive
        try:
            await self.openid_adapter.a_decode_token(
                token,
                key=await self.get_public_key(),
            )
            return True
        except Exception as e:
            logger.debug(f"Token validation failed: {e!s}")
            return False

    @override
    async def get_userinfo(self, token: str) -> KeycloakUserType:
        """Get user information from a token.

        Args:
            token: Access token

        Returns:
            User information

        Raises:
            ValueError: If getting user info fails
        """
        if not self.validate_token(token):
            logger.error("Invalid token provided for userinfo request")
            raise ValueError("Invalid token provided")
        try:
            return await self._get_userinfo_cached(token)
        except KeycloakError as e:
            logger.error(f"Failed to get user info: {e!s}")
            raise ValueError(f"Failed to get user info: {e!s}")

    @alru_cache(ttl=30, maxsize=100)  # Cache for 30 seconds
    async def _get_userinfo_cached(self, token):
        return await self.openid_adapter.a_userinfo(token)

    @override
    @alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
    async def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
        """Get user details by user ID.

        Args:
            user_id: User's ID

        Returns:
            User details or None if not found

        Raises:
            ValueError: If getting user fails
        """
        try:
            return await self.admin_adapter.a_get_user(user_id)
        except KeycloakGetError as e:
            if e.response_code == 404:
                return None
            logger.error(f"Failed to get user by ID: {e!s}")
            raise ValueError(f"Failed to get user by ID: {e!s}")
        except KeycloakError as e:
            logger.error(f"Failed to get user by ID: {e!s}")
            raise ValueError(f"Failed to get user by ID: {e!s}")

    @override
    @alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
    async def get_user_by_username(self, username: str) -> KeycloakUserType | None:
        """Get user details by username.

        Args:
            username: User's username

        Returns:
            User details or None if not found

        Raises:
            ValueError: If query fails
        """
        try:
            users = await self.admin_adapter.a_get_users({"username": username})
            return users[0] if users else None
        except KeycloakError as e:
            logger.error(f"Failed to get user by username: {e!s}")
            raise ValueError(f"Failed to get user by username: {e!s}")

    @override
    @alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
    async def get_user_by_email(self, email: str) -> KeycloakUserType | None:
        """Get user details by email.

        Args:
            email: User's email

        Returns:
            User details or None if not found

        Raises:
            ValueError: If query fails
        """
        try:
            users = await self.admin_adapter.a_get_users({"email": email})
            return users[0] if users else None
        except KeycloakError as e:
            logger.error(f"Failed to get user by email: {e!s}")
            raise ValueError(f"Failed to get user by email: {e!s}")

    @override
    @alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
    async def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
        """Get roles assigned to a user.

        Args:
            user_id: User's ID

        Returns:
            List of roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return await self.admin_adapter.a_get_realm_roles_of_user(user_id)
        except KeycloakError as e:
            logger.error(f"Failed to get user roles: {e!s}")
            raise ValueError(f"Failed to get user roles: {e!s}")

    @override
    @alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
    async def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
        """Get client-specific roles assigned to a user.

        Args:
            user_id: User's ID
            client_id: Client ID

        Returns:
            List of client-specific roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return await self.admin_adapter.a_get_client_roles_of_user(user_id, client_id)
        except KeycloakError as e:
            logger.error(f"Failed to get client roles: {e!s}")
            raise ValueError(f"Failed to get client roles: {e!s}")

    @override
    async def has_role(self, token: str, role_name: str) -> bool:
        """Check if a user has a specific role.

        Args:
            token: Access token
            role_name: Role name to check

        Returns:
            True if user has the role, False otherwise
        """
        # Not caching this result as token validation is time-sensitive
        try:
            user_info = await self.get_userinfo(token)

            # Check realm roles
            realm_access = user_info.get("realm_access", {})
            roles = realm_access.get("roles", [])
            if role_name in roles:
                return True

            # Check client roles
            resource_access = user_info.get("resource_access", {})
            for client in resource_access.values():
                client_roles = client.get("roles", [])
                if role_name in client_roles:
                    return True

            return False
        except Exception as e:
            logger.debug(f"Role check failed: {e!s}")
            return False

    @override
    async def has_any_of_roles(self, token: str, role_names: set[str]) -> bool:
        """Check if a user has any of the specified roles.

        Args:
            token: Access token
            role_names: Set of role names to check

        Returns:
            True if user has any of the roles, False otherwise
        """
        try:
            user_info = await self.get_userinfo(token)

            # Check realm roles
            realm_access = user_info.get("realm_access", {})
            roles = set(realm_access.get("roles", []))
            if role_names.intersection(roles):
                return True

            # Check client roles
            resource_access = user_info.get("resource_access", {})
            for client in resource_access.values():
                client_roles = set(client.get("roles", []))
                if role_names.intersection(client_roles):
                    return True

            return False
        except Exception as e:
            logger.debug(f"Role check failed: {e!s}")
            return False

    @override
    async def has_all_roles(self, token: str, role_names: set[str]) -> bool:
        """Check if a user has all of the specified roles.

        Args:
            token: Access token
            role_names: Set of role names to check

        Returns:
            True if user has all of the roles, False otherwise
        """
        try:
            user_info = await self.get_userinfo(token)

            # Get all user roles
            all_roles = set()

            # Add realm roles
            realm_access = user_info.get("realm_access", {})
            all_roles.update(realm_access.get("roles", []))

            # Add client roles
            resource_access = user_info.get("resource_access", {})
            for client in resource_access.values():
                all_roles.update(client.get("roles", []))

            # Check if all required roles are present
            return role_names.issubset(all_roles)
        except Exception as e:
            logger.debug(f"All roles check failed: {e!s}")
            return False

    @override
    async def create_user(self, user_data: dict[str, Any]) -> str:
        """Create a new user in Keycloak.

        Args:
            user_data: User data including username, email, etc.

        Returns:
            ID of the created user

        Raises:
            ValueError: If creating user fails
        """
        # This is a write operation, no caching needed
        try:
            user_id = await self.admin_adapter.a_create_user(user_data)

            # Clear related caches
            self.clear_all_caches()

            return user_id
        except KeycloakError as e:
            logger.error(f"Failed to create user: {e!s}")
            raise ValueError(f"Failed to create user: {e!s}")

    @override
    async def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
        """Update user details.

        Args:
            user_id: User's ID
            user_data: User data to update

        Raises:
            ValueError: If updating user fails
        """
        # This is a write operation, no caching needed
        try:
            await self.admin_adapter.a_update_user(user_id, user_data)

            # Clear user-related caches
            self.clear_all_caches()

        except KeycloakError as e:
            logger.error(f"Failed to update user: {e!s}")
            raise ValueError(f"Failed to update user: {e!s}")

    @override
    async def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
        """Reset a user's password.

        Args:
            user_id: User's ID
            password: New password
            temporary: Whether the password is temporary and should be changed on next login

        Raises:
            ValueError: If password reset fails
        """
        # This is a write operation, no caching needed
        try:
            await self.admin_adapter.a_set_user_password(user_id, password, temporary)
        except KeycloakError as e:
            logger.error(f"Failed to reset password: {e!s}")
            raise ValueError(f"Failed to reset password: {e!s}")

    @override
    async def assign_realm_role(self, user_id: str, role_name: str) -> None:
        """Assign a realm role to a user.

        Args:
            user_id: User's ID
            role_name: Role name to assign

        Raises:
            ValueError: If role assignment fails
        """
        # This is a write operation, no caching needed
        try:
            # Get role representation
            role = await self.admin_adapter.a_get_realm_role(role_name)
            # Assign role to user
            await self.admin_adapter.a_assign_realm_roles(user_id, [role])

            # Clear role-related caches
            if hasattr(self.get_user_roles, "clear_cache"):
                self.get_user_roles.clear_cache()

        except KeycloakError as e:
            logger.error(f"Failed to assign realm role: {e!s}")
            raise ValueError(f"Failed to assign realm role: {e!s}")

    @override
    async def remove_realm_role(self, user_id: str, role_name: str) -> None:
        """Remove a realm role from a user.

        Args:
            user_id: User's ID
            role_name: Role name to remove

        Raises:
            ValueError: If role removal fails
        """
        # This is a write operation, no caching needed
        try:
            # Get role representation
            role = await self.admin_adapter.a_get_realm_role(role_name)
            # Remove role from user
            await self.admin_adapter.a_delete_realm_roles_of_user(user_id, [role])

            # Clear role-related caches
            if hasattr(self.get_user_roles, "clear_cache"):
                self.get_user_roles.clear_cache()

        except KeycloakError as e:
            logger.error(f"Failed to remove realm role: {e!s}")
            raise ValueError(f"Failed to remove realm role: {e!s}")

    @override
    async def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Assign a client-specific role to a user.

        Args:
            user_id: User's ID
            client_id: Client ID
            role_name: Role name to assign

        Raises:
            ValueError: If role assignment fails
        """
        # This is a write operation, no caching needed
        try:
            # Get client
            client = await self.admin_adapter.a_get_client_id(client_id)
            # Get role representation
            role = await self.admin_adapter.a_get_client_role(client, role_name)
            # Assign role to user
            await self.admin_adapter.a_assign_client_role(user_id, client, [role])

            # Clear role-related caches
            if hasattr(self.get_client_roles_for_user, "clear_cache"):
                self.get_client_roles_for_user.clear_cache()

        except KeycloakError as e:
            logger.error(f"Failed to assign client role: {e!s}")
            raise ValueError(f"Failed to assign client role: {e!s}")

    @override
    async def create_realm_role(self, role_name: str, description: str | None = None) -> dict[str, Any]:
        """Create a new realm role.

        Args:
            role_name: Role name
            description: Optional role description

        Returns:
            Created role details

        Raises:
            ValueError: If role creation fails
        """
        # This is a write operation, no caching needed
        try:
            role_data = {"name": role_name}
            if description:
                role_data["description"] = description

            await self.admin_adapter.a_create_realm_role(role_data)

            # Clear realm roles cache
            if hasattr(self.get_realm_roles, "clear_cache"):
                self.get_realm_roles.clear_cache()

            created_role = await self.admin_adapter.a_get_realm_role(role_name)
            return created_role
        except KeycloakError as e:
            logger.error(f"Failed to create realm role: {e!s}")
            raise ValueError(f"Failed to create realm role: {e!s}")

    @override
    async def delete_realm_role(self, role_name: str) -> None:
        """Delete a realm role.

        Args:
            role_name: Role name to delete

        Raises:
            ValueError: If role deletion fails
        """
        # This is a write operation, no caching needed
        try:
            await self.admin_adapter.a_delete_realm_role(role_name)

            # Clear realm roles cache
            if hasattr(self.get_realm_roles, "clear_cache"):
                self.get_realm_roles.clear_cache()

            # We also need to clear user role caches since they might contain this role
            if hasattr(self.get_user_roles, "clear_cache"):
                self.get_user_roles.clear_cache()

        except KeycloakError as e:
            logger.error(f"Failed to delete realm role: {e!s}")
            raise ValueError(f"Failed to delete realm role: {e!s}")

    @override
    @alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
    async def get_service_account_id(self) -> str:
        """Get service account user ID for the current client.

        Returns:
            Service account user ID

        Raises:
            ValueError: If getting service account fails
        """
        try:
            client_id = await self.get_client_id(self.configs.CLIENT_ID)
            service_account = await self.admin_adapter.a_get_client_service_account_user(client_id)
            return service_account.get("id")
        except KeycloakError as e:
            logger.error(f"Failed to get service account ID: {e!s}")
            raise ValueError(f"Failed to get service account ID: {e!s}")

    @override
    @alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
    async def get_well_known_config(self) -> dict[str, Any]:
        """Get the well-known OpenID configuration.

        Returns:
            OIDC configuration

        Raises:
            ValueError: If getting configuration fails
        """
        try:
            return await self.openid_adapter.a_well_known()
        except KeycloakError as e:
            logger.error(f"Failed to get well-known config: {e!s}")
            raise ValueError(f"Failed to get well-known config: {e!s}")

    @override
    @alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
    async def get_certs(self) -> dict[str, Any]:
        """Get the JWT verification certificates.

        Returns:
            Certificate information

        Raises:
            ValueError: If getting certificates fails
        """
        try:
            return await self.openid_adapter.a_certs()
        except KeycloakError as e:
            logger.error(f"Failed to get certificates: {e!s}")
            raise ValueError(f"Failed to get certificates: {e!s}")

    @override
    async def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType:
        """Exchange authorization code for token.

        Args:
            code: Authorization code
            redirect_uri: Redirect URI used in authorization request

        Returns:
            Token response

        Raises:
            ValueError: If token exchange fails
        """
        # Authorization codes can only be used once, don't cache
        try:
            return await self.openid_adapter.a_token(
                grant_type="authorization_code",
                code=code,
                redirect_uri=redirect_uri,
            )
        except KeycloakError as e:
            logger.error(f"Failed to exchange code for token: {e!s}")
            raise ValueError(f"Failed to exchange code for token: {e!s}")

    @override
    async def get_client_credentials_token(self) -> KeycloakTokenType:
        """Get token using client credentials.

        Returns:
            Token response

        Raises:
            ValueError: If token acquisition fails
        """
        # Tokens are time-sensitive, don't cache
        try:
            return await self.openid_adapter.a_token(grant_type="client_credentials")
        except KeycloakError as e:
            logger.error(f"Failed to get client credentials token: {e!s}")
            raise ValueError(f"Failed to get client credentials token: {e!s}")

    @override
    @alru_cache(ttl=30, maxsize=50)  # Cache for 30 seconds with limited entries
    async def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
        """Search for users by username, email, or name.

        Args:
            query: Search query
            max_results: Maximum number of results to return

        Returns:
            List of matching users

        Raises:
            ValueError: If search fails
        """
        try:
            # Try searching by different fields
            users = []

            # Search by username
            users.extend(await self.admin_adapter.a_get_users({"username": query, "max": max_results}))

            # Search by email if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                email_users = await self.admin_adapter.a_get_users({"email": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in email_users if user["id"] not in user_ids])

            # Search by firstName if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                first_name_users = await self.admin_adapter.a_get_users({"firstName": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in first_name_users if user["id"] not in user_ids])

            # Search by lastName if no results or incomplete results
            if len(users) < max_results:
                remaining = max_results - len(users)
                last_name_users = await self.admin_adapter.a_get_users({"lastName": query, "max": remaining})
                # Filter out duplicates
                user_ids = {user["id"] for user in users}
                users.extend([user for user in last_name_users if user["id"] not in user_ids])

            return users[:max_results]
        except KeycloakError as e:
            logger.error(f"Failed to search users: {e!s}")
            raise ValueError(f"Failed to search users: {e!s}")

    @override
    async def check_permissions(self, token: str, resource: str, scope: str) -> bool:
        """Check if a user has permission to access a resource with the specified scope.

        Args:
            token: Access token
            resource: Resource name
            scope: Permission scope

        Returns:
            True if permission granted, False otherwise
        """
        try:
            # Use UMA permissions endpoint to check specific resource and scope
            permissions = await self.openid_adapter.a_uma_permissions(token, permissions=[f"{resource}#{scope}"])

            # Check if the response indicates permission is granted
            if not permissions or not isinstance(permissions, list):
                logger.debug("No permissions returned or invalid response format")
                return False

            # Look for the specific permission in the response
            for perm in permissions:
                if perm.get("rsname") == resource and scope in perm.get("scopes", []):
                    return True

            return False
        except KeycloakError as e:
            logger.debug(f"Permission check failed with Keycloak error: {e!s}")
            return False
        except Exception as e:
            logger.error(f"Unexpected error in permission check: {e!s}")
            return False

    @override
    @alru_cache(ttl=3600, maxsize=50)  # Cache for 1 hour
    async def get_client_secret(self, client_id: str) -> str:
        """Get client secret.

        Args:
            client_id: Client ID

        Returns:
            Client secret

        Raises:
            ValueError: If getting secret fails
        """
        try:
            client = await self.admin_adapter.a_get_client(client_id)
            return client.get("secret", "")
        except KeycloakError as e:
            logger.error(f"Failed to get client secret: {e!s}")
            raise ValueError(f"Failed to get client secret: {e!s}")

    @override
    @alru_cache(ttl=3600, maxsize=50)  # Cache for 1 hour
    async def get_client_id(self, client_name: str) -> str:
        """Get client ID by client name.

        Args:
            client_name: Name of the client

        Returns:
            Client ID

        Raises:
            ValueError: If client not found
        """
        try:
            return await self.admin_adapter.a_get_client_id(client_name)
        except KeycloakError as e:
            logger.error(f"Failed to get client ID: {e!s}")
            raise ValueError(f"Failed to get client ID: {e!s}")

    @override
    @alru_cache(ttl=300, maxsize=1)  # Cache for 5 minutes
    async def get_realm_roles(self) -> list[dict[str, Any]]:
        """Get all realm roles.

        Returns:
            List of realm roles

        Raises:
            ValueError: If getting roles fails
        """
        try:
            return await self.admin_adapter.a_get_realm_roles()
        except KeycloakError as e:
            logger.error(f"Failed to get realm roles: {e!s}")
            raise ValueError(f"Failed to get realm roles: {e!s}")

    @override
    @alru_cache(ttl=300, maxsize=1)  # Cache for 5 minutes
    async def get_realm_role(self, role_name: str) -> dict:
        """Get realm role.

        Args:
            role_name: Role name
        Returns:
            A realm role

        Raises:
            ValueError: If getting role fails
        """
        try:
            return await self.admin_adapter.a_get_realm_role(role_name)
        except KeycloakError as e:
            logger.error(f"Failed to get realm role: {e!s}")
            raise ValueError(f"Failed to get realm role: {e!s}")

    @override
    async def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Remove a client-specific role from a user.

        Args:
            user_id: User's ID
            client_id: Client ID
            role_name: Role name to remove

        Raises:
            ValueError: If role removal fails
        """
        try:
            client = await self.admin_adapter.a_get_client_id(client_id)
            role = await self.admin_adapter.a_get_client_role(client, role_name)
            await self.admin_adapter.a_delete_client_roles_of_user(user_id, client, [role])

            if hasattr(self.get_client_roles_for_user, "clear_cache"):
                self.get_client_roles_for_user.clear_cache()
        except KeycloakError as e:
            logger.error(f"Failed to remove client role: {e!s}")
            raise ValueError(f"Failed to remove client role: {e!s}")

    @override
    async def clear_user_sessions(self, user_id: str) -> None:
        """Clear all sessions for a user.

        Args:
            user_id: User's ID

        Raises:
            ValueError: If clearing sessions fails
        """
        try:
            await self.admin_adapter.a_user_logout(user_id)
        except KeycloakError as e:
            logger.error(f"Failed to clear user sessions: {e!s}")
            raise ValueError(f"Failed to clear user sessions: {e!s}")

    @override
    async def logout(self, refresh_token: str) -> None:
        """Logout user by invalidating their refresh token.

        Args:
            refresh_token: Refresh token to invalidate

        Raises:
            ValueError: If logout fails
        """
        try:
            await self.openid_adapter.a_logout(refresh_token)
        except KeycloakError as e:
            logger.error(f"Failed to logout: {e!s}")
            raise ValueError(f"Failed to logout: {e!s}")

    @override
    async def introspect_token(self, token: str) -> dict[str, Any]:
        """Introspect token to get detailed information about it.

        Args:
            token: Access token

        Returns:
            Token introspection details

        Raises:
            ValueError: If token introspection fails
        """
        try:
            return await self.openid_adapter.a_introspect(token)
        except KeycloakError as e:
            logger.error(f"Failed to introspect token: {e!s}")
            raise ValueError(f"Failed to introspect token: {e!s}")

    @override
    async def get_token_info(self, token: str) -> dict[str, Any]:
        """Decode token to get its claims.

        Args:
            token: Access token

        Returns:
            Dictionary of token claims

        Raises:
            ValueError: If token decoding fails
        """
        try:
            return await self.openid_adapter.a_decode_token(
                token,
                key=await self.get_public_key(),
            )
        except KeycloakError as e:
            logger.error(f"Failed to get token info: {e!s}")
            raise ValueError(f"Failed to get token info: {e!s}")

    @override
    async def delete_user(self, user_id: str) -> None:
        """Delete a user from Keycloak by their ID.

        Args:
            user_id: The ID of the user to delete

        Raises:
            ValueError: If the deletion fails
        """
        try:
            await self.admin_adapter.a_delete_user(user_id=user_id)
            logger.info(f"Successfully deleted user with ID {user_id}")
        except Exception as e:
            logger.error(f"Failed to delete user with ID {user_id}: {e!s}")
            raise ValueError(f"Failed to delete user: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.admin_adapter property

Get the admin adapter, refreshing it if necessary.

Returns:

Type Description
KeycloakAdmin

KeycloakAdmin instance

Raises:

Type Description
ValueError

If admin client is not available

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.__init__(keycloak_configs=None)

Initialize KeycloakAdapter with configuration.

Parameters:

Name Type Description Default
keycloak_configs KeycloakConfig | None

Optional Keycloak configuration. If None, global config is used.

None
Source code in archipy/adapters/keycloak/adapters.py
def __init__(self, keycloak_configs: KeycloakConfig | None = None) -> None:
    """Initialize KeycloakAdapter with configuration.

    Args:
        keycloak_configs: Optional Keycloak configuration. If None, global config is used.
    """
    self.configs: KeycloakConfig = (
        BaseConfig.global_config().KEYCLOAK if keycloak_configs is None else keycloak_configs
    )

    # Initialize the OpenID client for authentication
    self.openid_adapter = self._get_openid_client(self.configs)

    # Cache for admin client to avoid unnecessary re-authentication
    self._admin_adapter = None
    self._admin_token_expiry = 0

    # Initialize admin client with service account if client_secret is provided
    if self.configs.CLIENT_SECRET_KEY:
        self._initialize_admin_client()

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.clear_all_caches()

Clear all cached values.

Source code in archipy/adapters/keycloak/adapters.py
def clear_all_caches(self) -> None:
    """Clear all cached values."""
    for attr_name in dir(self):
        attr = getattr(self, attr_name)
        if hasattr(attr, "clear_cache"):
            attr.clear_cache()

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_public_key() async

Get the public key used to verify tokens.

Returns:

Type Description
Any

JWK key object used to verify signatures

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour, public key rarely changes
async def get_public_key(self) -> Any:
    """Get the public key used to verify tokens.

    Returns:
        JWK key object used to verify signatures
    """
    try:
        from jwcrypto import jwk

        keys_info = await self.openid_adapter.a_public_key()
        key = f"-----BEGIN PUBLIC KEY-----\n{keys_info}\n-----END PUBLIC KEY-----"
        return jwk.JWK.from_pem(key.encode("utf-8"))
    except Exception as e:
        logger.error(f"Failed to get public key: {e!s}")
        raise ValueError(f"Failed to get public key: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_token(username, password) async

Get a user token by username and password using the Resource Owner Password Credentials Grant.

Warning

This method uses the direct password grant flow, which is less secure and not recommended for user login in production environments. Instead, prefer the web-based OAuth 2.0 Authorization Code Flow (use get_token_from_code) for secure authentication. Use this method only for testing, administrative tasks, or specific service accounts where direct credential use is acceptable and properly secured.

Parameters:

Name Type Description Default
username str

User's username

required
password str

User's password

required

Returns:

Type Description
KeycloakTokenType

Token response containing access_token, refresh_token, etc.

Raises:

Type Description
ValueError

If token acquisition fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_token(self, username: str, password: str) -> KeycloakTokenType:
    """Get a user token by username and password using the Resource Owner Password Credentials Grant.

    Warning:
        This method uses the direct password grant flow, which is less secure and not recommended
        for user login in production environments. Instead, prefer the web-based OAuth 2.0
        Authorization Code Flow (use `get_token_from_code`) for secure authentication.
        Use this method only for testing, administrative tasks, or specific service accounts
        where direct credential use is acceptable and properly secured.

    Args:
        username: User's username
        password: User's password

    Returns:
        Token response containing access_token, refresh_token, etc.

    Raises:
        ValueError: If token acquisition fails
    """
    try:
        return await self.openid_adapter.a_token(grant_type="password", username=username, password=password)
    except KeycloakError as e:
        logger.error(f"Failed to get token: {e!s}")
        raise ValueError(f"Failed to get token: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.refresh_token(refresh_token) async

Refresh an existing token using a refresh token.

Parameters:

Name Type Description Default
refresh_token str

Refresh token string

required

Returns:

Type Description
KeycloakTokenType

New token response containing access_token, refresh_token, etc.

Raises:

Type Description
ValueError

If token refresh fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def refresh_token(self, refresh_token: str) -> KeycloakTokenType:
    """Refresh an existing token using a refresh token.

    Args:
        refresh_token: Refresh token string

    Returns:
        New token response containing access_token, refresh_token, etc.

    Raises:
        ValueError: If token refresh fails
    """
    try:
        return await self.openid_adapter.a_refresh_token(refresh_token)
    except KeycloakError as e:
        logger.error(f"Failed to refresh token: {e!s}")
        raise ValueError(f"Failed to refresh token: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.validate_token(token) async

Validate if a token is still valid.

Parameters:

Name Type Description Default
token str

Access token to validate

required

Returns:

Type Description
bool

True if token is valid, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
async def validate_token(self, token: str) -> bool:
    """Validate if a token is still valid.

    Args:
        token: Access token to validate

    Returns:
        True if token is valid, False otherwise
    """
    # Not caching validation results as tokens are time-sensitive
    try:
        await self.openid_adapter.a_decode_token(
            token,
            key=await self.get_public_key(),
        )
        return True
    except Exception as e:
        logger.debug(f"Token validation failed: {e!s}")
        return False

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_userinfo(token) async

Get user information from a token.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
KeycloakUserType

User information

Raises:

Type Description
ValueError

If getting user info fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_userinfo(self, token: str) -> KeycloakUserType:
    """Get user information from a token.

    Args:
        token: Access token

    Returns:
        User information

    Raises:
        ValueError: If getting user info fails
    """
    if not self.validate_token(token):
        logger.error("Invalid token provided for userinfo request")
        raise ValueError("Invalid token provided")
    try:
        return await self._get_userinfo_cached(token)
    except KeycloakError as e:
        logger.error(f"Failed to get user info: {e!s}")
        raise ValueError(f"Failed to get user info: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_user_by_id(user_id) async

Get user details by user ID.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If getting user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
async def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
    """Get user details by user ID.

    Args:
        user_id: User's ID

    Returns:
        User details or None if not found

    Raises:
        ValueError: If getting user fails
    """
    try:
        return await self.admin_adapter.a_get_user(user_id)
    except KeycloakGetError as e:
        if e.response_code == 404:
            return None
        logger.error(f"Failed to get user by ID: {e!s}")
        raise ValueError(f"Failed to get user by ID: {e!s}")
    except KeycloakError as e:
        logger.error(f"Failed to get user by ID: {e!s}")
        raise ValueError(f"Failed to get user by ID: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_user_by_username(username) async

Get user details by username.

Parameters:

Name Type Description Default
username str

User's username

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If query fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
async def get_user_by_username(self, username: str) -> KeycloakUserType | None:
    """Get user details by username.

    Args:
        username: User's username

    Returns:
        User details or None if not found

    Raises:
        ValueError: If query fails
    """
    try:
        users = await self.admin_adapter.a_get_users({"username": username})
        return users[0] if users else None
    except KeycloakError as e:
        logger.error(f"Failed to get user by username: {e!s}")
        raise ValueError(f"Failed to get user by username: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_user_by_email(email) async

Get user details by email.

Parameters:

Name Type Description Default
email str

User's email

required

Returns:

Type Description
KeycloakUserType | None

User details or None if not found

Raises:

Type Description
ValueError

If query fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
async def get_user_by_email(self, email: str) -> KeycloakUserType | None:
    """Get user details by email.

    Args:
        email: User's email

    Returns:
        User details or None if not found

    Raises:
        ValueError: If query fails
    """
    try:
        users = await self.admin_adapter.a_get_users({"email": email})
        return users[0] if users else None
    except KeycloakError as e:
        logger.error(f"Failed to get user by email: {e!s}")
        raise ValueError(f"Failed to get user by email: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_user_roles(user_id) async

Get roles assigned to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Returns:

Type Description
list[KeycloakRoleType]

List of roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
async def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
    """Get roles assigned to a user.

    Args:
        user_id: User's ID

    Returns:
        List of roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return await self.admin_adapter.a_get_realm_roles_of_user(user_id)
    except KeycloakError as e:
        logger.error(f"Failed to get user roles: {e!s}")
        raise ValueError(f"Failed to get user roles: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_client_roles_for_user(user_id, client_id) async

Get client-specific roles assigned to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required

Returns:

Type Description
list[KeycloakRoleType]

List of client-specific roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=100)  # Cache for 5 minutes
async def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
    """Get client-specific roles assigned to a user.

    Args:
        user_id: User's ID
        client_id: Client ID

    Returns:
        List of client-specific roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return await self.admin_adapter.a_get_client_roles_of_user(user_id, client_id)
    except KeycloakError as e:
        logger.error(f"Failed to get client roles: {e!s}")
        raise ValueError(f"Failed to get client roles: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.has_role(token, role_name) async

Check if a user has a specific role.

Parameters:

Name Type Description Default
token str

Access token

required
role_name str

Role name to check

required

Returns:

Type Description
bool

True if user has the role, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
async def has_role(self, token: str, role_name: str) -> bool:
    """Check if a user has a specific role.

    Args:
        token: Access token
        role_name: Role name to check

    Returns:
        True if user has the role, False otherwise
    """
    # Not caching this result as token validation is time-sensitive
    try:
        user_info = await self.get_userinfo(token)

        # Check realm roles
        realm_access = user_info.get("realm_access", {})
        roles = realm_access.get("roles", [])
        if role_name in roles:
            return True

        # Check client roles
        resource_access = user_info.get("resource_access", {})
        for client in resource_access.values():
            client_roles = client.get("roles", [])
            if role_name in client_roles:
                return True

        return False
    except Exception as e:
        logger.debug(f"Role check failed: {e!s}")
        return False

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.has_any_of_roles(token, role_names) async

Check if a user has any of the specified roles.

Parameters:

Name Type Description Default
token str

Access token

required
role_names set[str]

Set of role names to check

required

Returns:

Type Description
bool

True if user has any of the roles, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
async def has_any_of_roles(self, token: str, role_names: set[str]) -> bool:
    """Check if a user has any of the specified roles.

    Args:
        token: Access token
        role_names: Set of role names to check

    Returns:
        True if user has any of the roles, False otherwise
    """
    try:
        user_info = await self.get_userinfo(token)

        # Check realm roles
        realm_access = user_info.get("realm_access", {})
        roles = set(realm_access.get("roles", []))
        if role_names.intersection(roles):
            return True

        # Check client roles
        resource_access = user_info.get("resource_access", {})
        for client in resource_access.values():
            client_roles = set(client.get("roles", []))
            if role_names.intersection(client_roles):
                return True

        return False
    except Exception as e:
        logger.debug(f"Role check failed: {e!s}")
        return False

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.has_all_roles(token, role_names) async

Check if a user has all of the specified roles.

Parameters:

Name Type Description Default
token str

Access token

required
role_names set[str]

Set of role names to check

required

Returns:

Type Description
bool

True if user has all of the roles, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
async def has_all_roles(self, token: str, role_names: set[str]) -> bool:
    """Check if a user has all of the specified roles.

    Args:
        token: Access token
        role_names: Set of role names to check

    Returns:
        True if user has all of the roles, False otherwise
    """
    try:
        user_info = await self.get_userinfo(token)

        # Get all user roles
        all_roles = set()

        # Add realm roles
        realm_access = user_info.get("realm_access", {})
        all_roles.update(realm_access.get("roles", []))

        # Add client roles
        resource_access = user_info.get("resource_access", {})
        for client in resource_access.values():
            all_roles.update(client.get("roles", []))

        # Check if all required roles are present
        return role_names.issubset(all_roles)
    except Exception as e:
        logger.debug(f"All roles check failed: {e!s}")
        return False

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.create_user(user_data) async

Create a new user in Keycloak.

Parameters:

Name Type Description Default
user_data dict[str, Any]

User data including username, email, etc.

required

Returns:

Type Description
str

ID of the created user

Raises:

Type Description
ValueError

If creating user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def create_user(self, user_data: dict[str, Any]) -> str:
    """Create a new user in Keycloak.

    Args:
        user_data: User data including username, email, etc.

    Returns:
        ID of the created user

    Raises:
        ValueError: If creating user fails
    """
    # This is a write operation, no caching needed
    try:
        user_id = await self.admin_adapter.a_create_user(user_data)

        # Clear related caches
        self.clear_all_caches()

        return user_id
    except KeycloakError as e:
        logger.error(f"Failed to create user: {e!s}")
        raise ValueError(f"Failed to create user: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.update_user(user_id, user_data) async

Update user details.

Parameters:

Name Type Description Default
user_id str

User's ID

required
user_data dict[str, Any]

User data to update

required

Raises:

Type Description
ValueError

If updating user fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
    """Update user details.

    Args:
        user_id: User's ID
        user_data: User data to update

    Raises:
        ValueError: If updating user fails
    """
    # This is a write operation, no caching needed
    try:
        await self.admin_adapter.a_update_user(user_id, user_data)

        # Clear user-related caches
        self.clear_all_caches()

    except KeycloakError as e:
        logger.error(f"Failed to update user: {e!s}")
        raise ValueError(f"Failed to update user: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.reset_password(user_id, password, temporary=False) async

Reset a user's password.

Parameters:

Name Type Description Default
user_id str

User's ID

required
password str

New password

required
temporary bool

Whether the password is temporary and should be changed on next login

False

Raises:

Type Description
ValueError

If password reset fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
    """Reset a user's password.

    Args:
        user_id: User's ID
        password: New password
        temporary: Whether the password is temporary and should be changed on next login

    Raises:
        ValueError: If password reset fails
    """
    # This is a write operation, no caching needed
    try:
        await self.admin_adapter.a_set_user_password(user_id, password, temporary)
    except KeycloakError as e:
        logger.error(f"Failed to reset password: {e!s}")
        raise ValueError(f"Failed to reset password: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.assign_realm_role(user_id, role_name) async

Assign a realm role to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
role_name str

Role name to assign

required

Raises:

Type Description
ValueError

If role assignment fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def assign_realm_role(self, user_id: str, role_name: str) -> None:
    """Assign a realm role to a user.

    Args:
        user_id: User's ID
        role_name: Role name to assign

    Raises:
        ValueError: If role assignment fails
    """
    # This is a write operation, no caching needed
    try:
        # Get role representation
        role = await self.admin_adapter.a_get_realm_role(role_name)
        # Assign role to user
        await self.admin_adapter.a_assign_realm_roles(user_id, [role])

        # Clear role-related caches
        if hasattr(self.get_user_roles, "clear_cache"):
            self.get_user_roles.clear_cache()

    except KeycloakError as e:
        logger.error(f"Failed to assign realm role: {e!s}")
        raise ValueError(f"Failed to assign realm role: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.remove_realm_role(user_id, role_name) async

Remove a realm role from a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
role_name str

Role name to remove

required

Raises:

Type Description
ValueError

If role removal fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def remove_realm_role(self, user_id: str, role_name: str) -> None:
    """Remove a realm role from a user.

    Args:
        user_id: User's ID
        role_name: Role name to remove

    Raises:
        ValueError: If role removal fails
    """
    # This is a write operation, no caching needed
    try:
        # Get role representation
        role = await self.admin_adapter.a_get_realm_role(role_name)
        # Remove role from user
        await self.admin_adapter.a_delete_realm_roles_of_user(user_id, [role])

        # Clear role-related caches
        if hasattr(self.get_user_roles, "clear_cache"):
            self.get_user_roles.clear_cache()

    except KeycloakError as e:
        logger.error(f"Failed to remove realm role: {e!s}")
        raise ValueError(f"Failed to remove realm role: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.assign_client_role(user_id, client_id, role_name) async

Assign a client-specific role to a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required
role_name str

Role name to assign

required

Raises:

Type Description
ValueError

If role assignment fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Assign a client-specific role to a user.

    Args:
        user_id: User's ID
        client_id: Client ID
        role_name: Role name to assign

    Raises:
        ValueError: If role assignment fails
    """
    # This is a write operation, no caching needed
    try:
        # Get client
        client = await self.admin_adapter.a_get_client_id(client_id)
        # Get role representation
        role = await self.admin_adapter.a_get_client_role(client, role_name)
        # Assign role to user
        await self.admin_adapter.a_assign_client_role(user_id, client, [role])

        # Clear role-related caches
        if hasattr(self.get_client_roles_for_user, "clear_cache"):
            self.get_client_roles_for_user.clear_cache()

    except KeycloakError as e:
        logger.error(f"Failed to assign client role: {e!s}")
        raise ValueError(f"Failed to assign client role: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.create_realm_role(role_name, description=None) async

Create a new realm role.

Parameters:

Name Type Description Default
role_name str

Role name

required
description str | None

Optional role description

None

Returns:

Type Description
dict[str, Any]

Created role details

Raises:

Type Description
ValueError

If role creation fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def create_realm_role(self, role_name: str, description: str | None = None) -> dict[str, Any]:
    """Create a new realm role.

    Args:
        role_name: Role name
        description: Optional role description

    Returns:
        Created role details

    Raises:
        ValueError: If role creation fails
    """
    # This is a write operation, no caching needed
    try:
        role_data = {"name": role_name}
        if description:
            role_data["description"] = description

        await self.admin_adapter.a_create_realm_role(role_data)

        # Clear realm roles cache
        if hasattr(self.get_realm_roles, "clear_cache"):
            self.get_realm_roles.clear_cache()

        created_role = await self.admin_adapter.a_get_realm_role(role_name)
        return created_role
    except KeycloakError as e:
        logger.error(f"Failed to create realm role: {e!s}")
        raise ValueError(f"Failed to create realm role: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.delete_realm_role(role_name) async

Delete a realm role.

Parameters:

Name Type Description Default
role_name str

Role name to delete

required

Raises:

Type Description
ValueError

If role deletion fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def delete_realm_role(self, role_name: str) -> None:
    """Delete a realm role.

    Args:
        role_name: Role name to delete

    Raises:
        ValueError: If role deletion fails
    """
    # This is a write operation, no caching needed
    try:
        await self.admin_adapter.a_delete_realm_role(role_name)

        # Clear realm roles cache
        if hasattr(self.get_realm_roles, "clear_cache"):
            self.get_realm_roles.clear_cache()

        # We also need to clear user role caches since they might contain this role
        if hasattr(self.get_user_roles, "clear_cache"):
            self.get_user_roles.clear_cache()

    except KeycloakError as e:
        logger.error(f"Failed to delete realm role: {e!s}")
        raise ValueError(f"Failed to delete realm role: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_service_account_id() async

Get service account user ID for the current client.

Returns:

Type Description
str

Service account user ID

Raises:

Type Description
ValueError

If getting service account fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
async def get_service_account_id(self) -> str:
    """Get service account user ID for the current client.

    Returns:
        Service account user ID

    Raises:
        ValueError: If getting service account fails
    """
    try:
        client_id = await self.get_client_id(self.configs.CLIENT_ID)
        service_account = await self.admin_adapter.a_get_client_service_account_user(client_id)
        return service_account.get("id")
    except KeycloakError as e:
        logger.error(f"Failed to get service account ID: {e!s}")
        raise ValueError(f"Failed to get service account ID: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_well_known_config() async

Get the well-known OpenID configuration.

Returns:

Type Description
dict[str, Any]

OIDC configuration

Raises:

Type Description
ValueError

If getting configuration fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
async def get_well_known_config(self) -> dict[str, Any]:
    """Get the well-known OpenID configuration.

    Returns:
        OIDC configuration

    Raises:
        ValueError: If getting configuration fails
    """
    try:
        return await self.openid_adapter.a_well_known()
    except KeycloakError as e:
        logger.error(f"Failed to get well-known config: {e!s}")
        raise ValueError(f"Failed to get well-known config: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_certs() async

Get the JWT verification certificates.

Returns:

Type Description
dict[str, Any]

Certificate information

Raises:

Type Description
ValueError

If getting certificates fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=1)  # Cache for 1 hour
async def get_certs(self) -> dict[str, Any]:
    """Get the JWT verification certificates.

    Returns:
        Certificate information

    Raises:
        ValueError: If getting certificates fails
    """
    try:
        return await self.openid_adapter.a_certs()
    except KeycloakError as e:
        logger.error(f"Failed to get certificates: {e!s}")
        raise ValueError(f"Failed to get certificates: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_token_from_code(code, redirect_uri) async

Exchange authorization code for token.

Parameters:

Name Type Description Default
code str

Authorization code

required
redirect_uri str

Redirect URI used in authorization request

required

Returns:

Type Description
KeycloakTokenType

Token response

Raises:

Type Description
ValueError

If token exchange fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType:
    """Exchange authorization code for token.

    Args:
        code: Authorization code
        redirect_uri: Redirect URI used in authorization request

    Returns:
        Token response

    Raises:
        ValueError: If token exchange fails
    """
    # Authorization codes can only be used once, don't cache
    try:
        return await self.openid_adapter.a_token(
            grant_type="authorization_code",
            code=code,
            redirect_uri=redirect_uri,
        )
    except KeycloakError as e:
        logger.error(f"Failed to exchange code for token: {e!s}")
        raise ValueError(f"Failed to exchange code for token: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_client_credentials_token() async

Get token using client credentials.

Returns:

Type Description
KeycloakTokenType

Token response

Raises:

Type Description
ValueError

If token acquisition fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_client_credentials_token(self) -> KeycloakTokenType:
    """Get token using client credentials.

    Returns:
        Token response

    Raises:
        ValueError: If token acquisition fails
    """
    # Tokens are time-sensitive, don't cache
    try:
        return await self.openid_adapter.a_token(grant_type="client_credentials")
    except KeycloakError as e:
        logger.error(f"Failed to get client credentials token: {e!s}")
        raise ValueError(f"Failed to get client credentials token: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.search_users(query, max_results=100) async

Search for users by username, email, or name.

Parameters:

Name Type Description Default
query str

Search query

required
max_results int

Maximum number of results to return

100

Returns:

Type Description
list[KeycloakUserType]

List of matching users

Raises:

Type Description
ValueError

If search fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=30, maxsize=50)  # Cache for 30 seconds with limited entries
async def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
    """Search for users by username, email, or name.

    Args:
        query: Search query
        max_results: Maximum number of results to return

    Returns:
        List of matching users

    Raises:
        ValueError: If search fails
    """
    try:
        # Try searching by different fields
        users = []

        # Search by username
        users.extend(await self.admin_adapter.a_get_users({"username": query, "max": max_results}))

        # Search by email if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            email_users = await self.admin_adapter.a_get_users({"email": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in email_users if user["id"] not in user_ids])

        # Search by firstName if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            first_name_users = await self.admin_adapter.a_get_users({"firstName": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in first_name_users if user["id"] not in user_ids])

        # Search by lastName if no results or incomplete results
        if len(users) < max_results:
            remaining = max_results - len(users)
            last_name_users = await self.admin_adapter.a_get_users({"lastName": query, "max": remaining})
            # Filter out duplicates
            user_ids = {user["id"] for user in users}
            users.extend([user for user in last_name_users if user["id"] not in user_ids])

        return users[:max_results]
    except KeycloakError as e:
        logger.error(f"Failed to search users: {e!s}")
        raise ValueError(f"Failed to search users: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.check_permissions(token, resource, scope) async

Check if a user has permission to access a resource with the specified scope.

Parameters:

Name Type Description Default
token str

Access token

required
resource str

Resource name

required
scope str

Permission scope

required

Returns:

Type Description
bool

True if permission granted, False otherwise

Source code in archipy/adapters/keycloak/adapters.py
@override
async def check_permissions(self, token: str, resource: str, scope: str) -> bool:
    """Check if a user has permission to access a resource with the specified scope.

    Args:
        token: Access token
        resource: Resource name
        scope: Permission scope

    Returns:
        True if permission granted, False otherwise
    """
    try:
        # Use UMA permissions endpoint to check specific resource and scope
        permissions = await self.openid_adapter.a_uma_permissions(token, permissions=[f"{resource}#{scope}"])

        # Check if the response indicates permission is granted
        if not permissions or not isinstance(permissions, list):
            logger.debug("No permissions returned or invalid response format")
            return False

        # Look for the specific permission in the response
        for perm in permissions:
            if perm.get("rsname") == resource and scope in perm.get("scopes", []):
                return True

        return False
    except KeycloakError as e:
        logger.debug(f"Permission check failed with Keycloak error: {e!s}")
        return False
    except Exception as e:
        logger.error(f"Unexpected error in permission check: {e!s}")
        return False

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_client_secret(client_id) async

Get client secret.

Parameters:

Name Type Description Default
client_id str

Client ID

required

Returns:

Type Description
str

Client secret

Raises:

Type Description
ValueError

If getting secret fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=50)  # Cache for 1 hour
async def get_client_secret(self, client_id: str) -> str:
    """Get client secret.

    Args:
        client_id: Client ID

    Returns:
        Client secret

    Raises:
        ValueError: If getting secret fails
    """
    try:
        client = await self.admin_adapter.a_get_client(client_id)
        return client.get("secret", "")
    except KeycloakError as e:
        logger.error(f"Failed to get client secret: {e!s}")
        raise ValueError(f"Failed to get client secret: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_client_id(client_name) async

Get client ID by client name.

Parameters:

Name Type Description Default
client_name str

Name of the client

required

Returns:

Type Description
str

Client ID

Raises:

Type Description
ValueError

If client not found

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=3600, maxsize=50)  # Cache for 1 hour
async def get_client_id(self, client_name: str) -> str:
    """Get client ID by client name.

    Args:
        client_name: Name of the client

    Returns:
        Client ID

    Raises:
        ValueError: If client not found
    """
    try:
        return await self.admin_adapter.a_get_client_id(client_name)
    except KeycloakError as e:
        logger.error(f"Failed to get client ID: {e!s}")
        raise ValueError(f"Failed to get client ID: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_realm_roles() async

Get all realm roles.

Returns:

Type Description
list[dict[str, Any]]

List of realm roles

Raises:

Type Description
ValueError

If getting roles fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=1)  # Cache for 5 minutes
async def get_realm_roles(self) -> list[dict[str, Any]]:
    """Get all realm roles.

    Returns:
        List of realm roles

    Raises:
        ValueError: If getting roles fails
    """
    try:
        return await self.admin_adapter.a_get_realm_roles()
    except KeycloakError as e:
        logger.error(f"Failed to get realm roles: {e!s}")
        raise ValueError(f"Failed to get realm roles: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_realm_role(role_name) async

Get realm role.

Parameters:

Name Type Description Default
role_name str

Role name

required

Returns: A realm role

Raises:

Type Description
ValueError

If getting role fails

Source code in archipy/adapters/keycloak/adapters.py
@override
@alru_cache(ttl=300, maxsize=1)  # Cache for 5 minutes
async def get_realm_role(self, role_name: str) -> dict:
    """Get realm role.

    Args:
        role_name: Role name
    Returns:
        A realm role

    Raises:
        ValueError: If getting role fails
    """
    try:
        return await self.admin_adapter.a_get_realm_role(role_name)
    except KeycloakError as e:
        logger.error(f"Failed to get realm role: {e!s}")
        raise ValueError(f"Failed to get realm role: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.remove_client_role(user_id, client_id, role_name) async

Remove a client-specific role from a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required
client_id str

Client ID

required
role_name str

Role name to remove

required

Raises:

Type Description
ValueError

If role removal fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Remove a client-specific role from a user.

    Args:
        user_id: User's ID
        client_id: Client ID
        role_name: Role name to remove

    Raises:
        ValueError: If role removal fails
    """
    try:
        client = await self.admin_adapter.a_get_client_id(client_id)
        role = await self.admin_adapter.a_get_client_role(client, role_name)
        await self.admin_adapter.a_delete_client_roles_of_user(user_id, client, [role])

        if hasattr(self.get_client_roles_for_user, "clear_cache"):
            self.get_client_roles_for_user.clear_cache()
    except KeycloakError as e:
        logger.error(f"Failed to remove client role: {e!s}")
        raise ValueError(f"Failed to remove client role: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.clear_user_sessions(user_id) async

Clear all sessions for a user.

Parameters:

Name Type Description Default
user_id str

User's ID

required

Raises:

Type Description
ValueError

If clearing sessions fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def clear_user_sessions(self, user_id: str) -> None:
    """Clear all sessions for a user.

    Args:
        user_id: User's ID

    Raises:
        ValueError: If clearing sessions fails
    """
    try:
        await self.admin_adapter.a_user_logout(user_id)
    except KeycloakError as e:
        logger.error(f"Failed to clear user sessions: {e!s}")
        raise ValueError(f"Failed to clear user sessions: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.logout(refresh_token) async

Logout user by invalidating their refresh token.

Parameters:

Name Type Description Default
refresh_token str

Refresh token to invalidate

required

Raises:

Type Description
ValueError

If logout fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def logout(self, refresh_token: str) -> None:
    """Logout user by invalidating their refresh token.

    Args:
        refresh_token: Refresh token to invalidate

    Raises:
        ValueError: If logout fails
    """
    try:
        await self.openid_adapter.a_logout(refresh_token)
    except KeycloakError as e:
        logger.error(f"Failed to logout: {e!s}")
        raise ValueError(f"Failed to logout: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.introspect_token(token) async

Introspect token to get detailed information about it.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
dict[str, Any]

Token introspection details

Raises:

Type Description
ValueError

If token introspection fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def introspect_token(self, token: str) -> dict[str, Any]:
    """Introspect token to get detailed information about it.

    Args:
        token: Access token

    Returns:
        Token introspection details

    Raises:
        ValueError: If token introspection fails
    """
    try:
        return await self.openid_adapter.a_introspect(token)
    except KeycloakError as e:
        logger.error(f"Failed to introspect token: {e!s}")
        raise ValueError(f"Failed to introspect token: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.get_token_info(token) async

Decode token to get its claims.

Parameters:

Name Type Description Default
token str

Access token

required

Returns:

Type Description
dict[str, Any]

Dictionary of token claims

Raises:

Type Description
ValueError

If token decoding fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def get_token_info(self, token: str) -> dict[str, Any]:
    """Decode token to get its claims.

    Args:
        token: Access token

    Returns:
        Dictionary of token claims

    Raises:
        ValueError: If token decoding fails
    """
    try:
        return await self.openid_adapter.a_decode_token(
            token,
            key=await self.get_public_key(),
        )
    except KeycloakError as e:
        logger.error(f"Failed to get token info: {e!s}")
        raise ValueError(f"Failed to get token info: {e!s}")

archipy.adapters.keycloak.adapters.AsyncKeycloakAdapter.delete_user(user_id) async

Delete a user from Keycloak by their ID.

Parameters:

Name Type Description Default
user_id str

The ID of the user to delete

required

Raises:

Type Description
ValueError

If the deletion fails

Source code in archipy/adapters/keycloak/adapters.py
@override
async def delete_user(self, user_id: str) -> None:
    """Delete a user from Keycloak by their ID.

    Args:
        user_id: The ID of the user to delete

    Raises:
        ValueError: If the deletion fails
    """
    try:
        await self.admin_adapter.a_delete_user(user_id=user_id)
        logger.info(f"Successfully deleted user with ID {user_id}")
    except Exception as e:
        logger.error(f"Failed to delete user with ID {user_id}: {e!s}")
        raise ValueError(f"Failed to delete user: {e!s}")

archipy.adapters.keycloak.ports

Keycloak port definitions for ArchiPy.

archipy.adapters.keycloak.ports.KeycloakPort

Interface for Keycloak operations providing a standardized access pattern.

This interface defines the contract for Keycloak adapters, ensuring consistent implementation of Keycloak operations across different adapters. It covers essential functionality including authentication, user management, and role management.

Source code in archipy/adapters/keycloak/ports.py
class KeycloakPort:
    """Interface for Keycloak operations providing a standardized access pattern.

    This interface defines the contract for Keycloak adapters, ensuring consistent
    implementation of Keycloak operations across different adapters. It covers essential
    functionality including authentication, user management, and role management.
    """

    # Token Operations
    @abstractmethod
    def get_token(self, username: str, password: str) -> KeycloakTokenType:
        """Get a user token by username and password."""
        raise NotImplementedError

    @abstractmethod
    def refresh_token(self, refresh_token: str) -> KeycloakTokenType:
        """Refresh an existing token using a refresh token."""
        raise NotImplementedError

    @abstractmethod
    def validate_token(self, token: str) -> bool:
        """Validate if a token is still valid."""
        raise NotImplementedError

    @abstractmethod
    def get_userinfo(self, token: str) -> KeycloakUserType:
        """Get user information from a token."""
        raise NotImplementedError

    @abstractmethod
    def get_token_info(self, token: str) -> dict[str, Any]:
        """Decode token to get its claims."""
        raise NotImplementedError

    @abstractmethod
    def introspect_token(self, token: str) -> dict[str, Any]:
        """Introspect token to get detailed information about it."""
        raise NotImplementedError

    @abstractmethod
    def get_client_credentials_token(self) -> KeycloakTokenType:
        """Get token using client credentials."""
        raise NotImplementedError

    @abstractmethod
    def logout(self, refresh_token: str) -> None:
        """Logout user by invalidating their refresh token."""
        raise NotImplementedError

    # User Operations
    @abstractmethod
    def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
        """Get user details by user ID."""
        raise NotImplementedError

    @abstractmethod
    def get_user_by_username(self, username: str) -> KeycloakUserType | None:
        """Get user details by username."""
        raise NotImplementedError

    @abstractmethod
    def get_user_by_email(self, email: str) -> KeycloakUserType | None:
        """Get user details by email."""
        raise NotImplementedError

    @abstractmethod
    def create_user(self, user_data: dict[str, Any]) -> str:
        """Create a new user in Keycloak."""
        raise NotImplementedError

    @abstractmethod
    def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
        """Update user details."""
        raise NotImplementedError

    @abstractmethod
    def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
        """Reset a user's password."""
        raise NotImplementedError

    @abstractmethod
    def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
        """Search for users by username, email, or name."""
        raise NotImplementedError

    @abstractmethod
    def clear_user_sessions(self, user_id: str) -> None:
        """Clear all sessions for a user."""
        raise NotImplementedError

    # Role Operations
    @abstractmethod
    def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
        """Get roles assigned to a user."""
        raise NotImplementedError

    @abstractmethod
    def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
        """Get client-specific roles assigned to a user."""
        raise NotImplementedError

    @abstractmethod
    def has_role(self, token: str, role_name: str) -> bool:
        """Check if a user has a specific role."""
        raise NotImplementedError

    @abstractmethod
    def has_any_of_roles(self, token: str, role_names: list[str]) -> bool:
        """Check if a user has any of the specified roles."""
        raise NotImplementedError

    @abstractmethod
    def has_all_roles(self, token: str, role_names: set[str]) -> bool:
        """Check if a user has all of the specified roles."""
        raise NotImplementedError

    @abstractmethod
    def assign_realm_role(self, user_id: str, role_name: str) -> None:
        """Assign a realm role to a user."""
        raise NotImplementedError

    @abstractmethod
    def remove_realm_role(self, user_id: str, role_name: str) -> None:
        """Remove a realm role from a user."""
        raise NotImplementedError

    @abstractmethod
    def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Assign a client-specific role to a user."""
        raise NotImplementedError

    @abstractmethod
    def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Remove a client-specific role from a user."""
        raise NotImplementedError

    @abstractmethod
    def get_realm_role(self, role_name: str) -> dict:
        """Get realm role."""
        raise NotImplementedError

    @abstractmethod
    def get_realm_roles(self) -> list[dict[str, Any]]:
        """Get all realm roles."""
        raise NotImplementedError

    @abstractmethod
    def create_realm_role(self, role_name: str, description: str | None = None) -> dict[str, Any]:
        """Create a new realm role."""
        raise NotImplementedError

    @abstractmethod
    def delete_realm_role(self, role_name: str) -> None:
        """Delete a realm role."""
        raise NotImplementedError

    # Client Operations
    @abstractmethod
    def get_client_id(self, client_name: str) -> str:
        """Get client ID by client name."""
        raise NotImplementedError

    @abstractmethod
    def get_client_secret(self, client_id: str) -> str:
        """Get client secret."""
        raise NotImplementedError

    @abstractmethod
    def get_service_account_id(self) -> str:
        """Get service account user ID for the current client."""
        raise NotImplementedError

    # System Operations
    @abstractmethod
    def get_public_key(self) -> Any:
        """Get the public key used to verify tokens."""
        raise NotImplementedError

    @abstractmethod
    def get_well_known_config(self) -> dict[str, Any]:
        """Get the well-known OpenID configuration."""
        raise NotImplementedError

    @abstractmethod
    def get_certs(self) -> dict[str, Any]:
        """Get the JWT verification certificates."""
        raise NotImplementedError

    # Authorization
    @abstractmethod
    def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType:
        """Exchange authorization code for token."""
        raise NotImplementedError

    @abstractmethod
    def check_permissions(self, token: str, resource: str, scope: str) -> bool:
        """Check if a user has permission to access a resource with the specified scope."""
        raise NotImplementedError

    @abstractmethod
    def delete_user(self, user_id: str) -> None:
        """Delete a user from Keycloak by their ID."""
        raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_token(username, password) abstractmethod

Get a user token by username and password.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_token(self, username: str, password: str) -> KeycloakTokenType:
    """Get a user token by username and password."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.refresh_token(refresh_token) abstractmethod

Refresh an existing token using a refresh token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def refresh_token(self, refresh_token: str) -> KeycloakTokenType:
    """Refresh an existing token using a refresh token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.validate_token(token) abstractmethod

Validate if a token is still valid.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def validate_token(self, token: str) -> bool:
    """Validate if a token is still valid."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_userinfo(token) abstractmethod

Get user information from a token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_userinfo(self, token: str) -> KeycloakUserType:
    """Get user information from a token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_token_info(token) abstractmethod

Decode token to get its claims.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_token_info(self, token: str) -> dict[str, Any]:
    """Decode token to get its claims."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.introspect_token(token) abstractmethod

Introspect token to get detailed information about it.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def introspect_token(self, token: str) -> dict[str, Any]:
    """Introspect token to get detailed information about it."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_client_credentials_token() abstractmethod

Get token using client credentials.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_client_credentials_token(self) -> KeycloakTokenType:
    """Get token using client credentials."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.logout(refresh_token) abstractmethod

Logout user by invalidating their refresh token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def logout(self, refresh_token: str) -> None:
    """Logout user by invalidating their refresh token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_user_by_id(user_id) abstractmethod

Get user details by user ID.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
    """Get user details by user ID."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_user_by_username(username) abstractmethod

Get user details by username.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_user_by_username(self, username: str) -> KeycloakUserType | None:
    """Get user details by username."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_user_by_email(email) abstractmethod

Get user details by email.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_user_by_email(self, email: str) -> KeycloakUserType | None:
    """Get user details by email."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.create_user(user_data) abstractmethod

Create a new user in Keycloak.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def create_user(self, user_data: dict[str, Any]) -> str:
    """Create a new user in Keycloak."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.update_user(user_id, user_data) abstractmethod

Update user details.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
    """Update user details."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.reset_password(user_id, password, temporary=False) abstractmethod

Reset a user's password.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
    """Reset a user's password."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.search_users(query, max_results=100) abstractmethod

Search for users by username, email, or name.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
    """Search for users by username, email, or name."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.clear_user_sessions(user_id) abstractmethod

Clear all sessions for a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def clear_user_sessions(self, user_id: str) -> None:
    """Clear all sessions for a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_user_roles(user_id) abstractmethod

Get roles assigned to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
    """Get roles assigned to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_client_roles_for_user(user_id, client_id) abstractmethod

Get client-specific roles assigned to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
    """Get client-specific roles assigned to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.has_role(token, role_name) abstractmethod

Check if a user has a specific role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def has_role(self, token: str, role_name: str) -> bool:
    """Check if a user has a specific role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.has_any_of_roles(token, role_names) abstractmethod

Check if a user has any of the specified roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def has_any_of_roles(self, token: str, role_names: list[str]) -> bool:
    """Check if a user has any of the specified roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.has_all_roles(token, role_names) abstractmethod

Check if a user has all of the specified roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def has_all_roles(self, token: str, role_names: set[str]) -> bool:
    """Check if a user has all of the specified roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.assign_realm_role(user_id, role_name) abstractmethod

Assign a realm role to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def assign_realm_role(self, user_id: str, role_name: str) -> None:
    """Assign a realm role to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.remove_realm_role(user_id, role_name) abstractmethod

Remove a realm role from a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def remove_realm_role(self, user_id: str, role_name: str) -> None:
    """Remove a realm role from a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.assign_client_role(user_id, client_id, role_name) abstractmethod

Assign a client-specific role to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Assign a client-specific role to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.remove_client_role(user_id, client_id, role_name) abstractmethod

Remove a client-specific role from a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Remove a client-specific role from a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_realm_role(role_name) abstractmethod

Get realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_realm_role(self, role_name: str) -> dict:
    """Get realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_realm_roles() abstractmethod

Get all realm roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_realm_roles(self) -> list[dict[str, Any]]:
    """Get all realm roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.create_realm_role(role_name, description=None) abstractmethod

Create a new realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def create_realm_role(self, role_name: str, description: str | None = None) -> dict[str, Any]:
    """Create a new realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.delete_realm_role(role_name) abstractmethod

Delete a realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def delete_realm_role(self, role_name: str) -> None:
    """Delete a realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_client_id(client_name) abstractmethod

Get client ID by client name.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_client_id(self, client_name: str) -> str:
    """Get client ID by client name."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_client_secret(client_id) abstractmethod

Get client secret.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_client_secret(self, client_id: str) -> str:
    """Get client secret."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_service_account_id() abstractmethod

Get service account user ID for the current client.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_service_account_id(self) -> str:
    """Get service account user ID for the current client."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_public_key() abstractmethod

Get the public key used to verify tokens.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_public_key(self) -> Any:
    """Get the public key used to verify tokens."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_well_known_config() abstractmethod

Get the well-known OpenID configuration.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_well_known_config(self) -> dict[str, Any]:
    """Get the well-known OpenID configuration."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_certs() abstractmethod

Get the JWT verification certificates.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_certs(self) -> dict[str, Any]:
    """Get the JWT verification certificates."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.get_token_from_code(code, redirect_uri) abstractmethod

Exchange authorization code for token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType:
    """Exchange authorization code for token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.check_permissions(token, resource, scope) abstractmethod

Check if a user has permission to access a resource with the specified scope.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def check_permissions(self, token: str, resource: str, scope: str) -> bool:
    """Check if a user has permission to access a resource with the specified scope."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.KeycloakPort.delete_user(user_id) abstractmethod

Delete a user from Keycloak by their ID.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
def delete_user(self, user_id: str) -> None:
    """Delete a user from Keycloak by their ID."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort

Asynchronous interface for Keycloak operations providing a standardized access pattern.

This interface defines the contract for async Keycloak adapters, ensuring consistent implementation of Keycloak operations across different adapters. It covers essential functionality including authentication, user management, and role management.

Source code in archipy/adapters/keycloak/ports.py
class AsyncKeycloakPort:
    """Asynchronous interface for Keycloak operations providing a standardized access pattern.

    This interface defines the contract for async Keycloak adapters, ensuring consistent
    implementation of Keycloak operations across different adapters. It covers essential
    functionality including authentication, user management, and role management.
    """

    # Token Operations
    @abstractmethod
    async def get_token(self, username: str, password: str) -> KeycloakTokenType:
        """Get a user token by username and password."""
        raise NotImplementedError

    @abstractmethod
    async def refresh_token(self, refresh_token: str) -> KeycloakTokenType:
        """Refresh an existing token using a refresh token."""
        raise NotImplementedError

    @abstractmethod
    async def validate_token(self, token: str) -> bool:
        """Validate if a token is still valid."""
        raise NotImplementedError

    @abstractmethod
    async def get_userinfo(self, token: str) -> KeycloakUserType:
        """Get user information from a token."""
        raise NotImplementedError

    @abstractmethod
    async def get_token_info(self, token: str) -> dict[str, Any]:
        """Decode token to get its claims."""
        raise NotImplementedError

    @abstractmethod
    async def introspect_token(self, token: str) -> dict[str, Any]:
        """Introspect token to get detailed information about it."""
        raise NotImplementedError

    @abstractmethod
    async def get_client_credentials_token(self) -> KeycloakTokenType:
        """Get token using client credentials."""
        raise NotImplementedError

    @abstractmethod
    async def logout(self, refresh_token: str) -> None:
        """Logout user by invalidating their refresh token."""
        raise NotImplementedError

    # User Operations
    @abstractmethod
    async def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
        """Get user details by user ID."""
        raise NotImplementedError

    @abstractmethod
    async def get_user_by_username(self, username: str) -> KeycloakUserType | None:
        """Get user details by username."""
        raise NotImplementedError

    @abstractmethod
    async def get_user_by_email(self, email: str) -> KeycloakUserType | None:
        """Get user details by email."""
        raise NotImplementedError

    @abstractmethod
    async def create_user(self, user_data: dict[str, Any]) -> str:
        """Create a new user in Keycloak."""
        raise NotImplementedError

    @abstractmethod
    async def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
        """Update user details."""
        raise NotImplementedError

    @abstractmethod
    async def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
        """Reset a user's password."""
        raise NotImplementedError

    @abstractmethod
    async def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
        """Search for users by username, email, or name."""
        raise NotImplementedError

    @abstractmethod
    async def clear_user_sessions(self, user_id: str) -> None:
        """Clear all sessions for a user."""
        raise NotImplementedError

    # Role Operations
    @abstractmethod
    async def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
        """Get roles assigned to a user."""
        raise NotImplementedError

    @abstractmethod
    async def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
        """Get client-specific roles assigned to a user."""
        raise NotImplementedError

    @abstractmethod
    async def has_role(self, token: str, role_name: str) -> bool:
        """Check if a user has a specific role."""
        raise NotImplementedError

    @abstractmethod
    async def has_any_of_roles(self, token: str, role_names: list[str]) -> bool:
        """Check if a user has any of the specified roles."""
        raise NotImplementedError

    @abstractmethod
    async def has_all_roles(self, token: str, role_names: set[str]) -> bool:
        """Check if a user has all of the specified roles."""
        raise NotImplementedError

    @abstractmethod
    async def assign_realm_role(self, user_id: str, role_name: str) -> None:
        """Assign a realm role to a user."""
        raise NotImplementedError

    @abstractmethod
    async def remove_realm_role(self, user_id: str, role_name: str) -> None:
        """Remove a realm role from a user."""
        raise NotImplementedError

    @abstractmethod
    async def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Assign a client-specific role to a user."""
        raise NotImplementedError

    @abstractmethod
    async def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
        """Remove a client-specific role from a user."""
        raise NotImplementedError

    @abstractmethod
    async def get_realm_role(self, role_name: str) -> dict:
        """Get realm role."""
        raise NotImplementedError

    @abstractmethod
    async def get_realm_roles(self) -> list[dict[str, Any]]:
        """Get all realm roles."""
        raise NotImplementedError

    @abstractmethod
    async def create_realm_role(self, role_name: str, description: str | None = None) -> dict[str, Any]:
        """Create a new realm role."""
        raise NotImplementedError

    @abstractmethod
    async def delete_realm_role(self, role_name: str) -> None:
        """Delete a realm role."""
        raise NotImplementedError

    # Client Operations
    @abstractmethod
    async def get_client_id(self, client_name: str) -> str:
        """Get client ID by client name."""
        raise NotImplementedError

    @abstractmethod
    async def get_client_secret(self, client_id: str) -> str:
        """Get client secret."""
        raise NotImplementedError

    @abstractmethod
    async def get_service_account_id(self) -> str:
        """Get service account user ID for the current client."""
        raise NotImplementedError

    # System Operations
    @abstractmethod
    async def get_public_key(self) -> Any:
        """Get the public key used to verify tokens."""
        raise NotImplementedError

    @abstractmethod
    async def get_well_known_config(self) -> dict[str, Any]:
        """Get the well-known OpenID configuration."""
        raise NotImplementedError

    @abstractmethod
    async def get_certs(self) -> dict[str, Any]:
        """Get the JWT verification certificates."""
        raise NotImplementedError

    # Authorization
    @abstractmethod
    async def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType:
        """Exchange authorization code for token."""
        raise NotImplementedError

    @abstractmethod
    async def check_permissions(self, token: str, resource: str, scope: str) -> bool:
        """Check if a user has permission to access a resource with the specified scope."""
        raise NotImplementedError

    @abstractmethod
    async def delete_user(self, user_id: str) -> None:
        """Delete a user from Keycloak by their ID."""
        raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_token(username, password) abstractmethod async

Get a user token by username and password.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_token(self, username: str, password: str) -> KeycloakTokenType:
    """Get a user token by username and password."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.refresh_token(refresh_token) abstractmethod async

Refresh an existing token using a refresh token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def refresh_token(self, refresh_token: str) -> KeycloakTokenType:
    """Refresh an existing token using a refresh token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.validate_token(token) abstractmethod async

Validate if a token is still valid.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def validate_token(self, token: str) -> bool:
    """Validate if a token is still valid."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_userinfo(token) abstractmethod async

Get user information from a token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_userinfo(self, token: str) -> KeycloakUserType:
    """Get user information from a token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_token_info(token) abstractmethod async

Decode token to get its claims.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_token_info(self, token: str) -> dict[str, Any]:
    """Decode token to get its claims."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.introspect_token(token) abstractmethod async

Introspect token to get detailed information about it.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def introspect_token(self, token: str) -> dict[str, Any]:
    """Introspect token to get detailed information about it."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_client_credentials_token() abstractmethod async

Get token using client credentials.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_client_credentials_token(self) -> KeycloakTokenType:
    """Get token using client credentials."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.logout(refresh_token) abstractmethod async

Logout user by invalidating their refresh token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def logout(self, refresh_token: str) -> None:
    """Logout user by invalidating their refresh token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_user_by_id(user_id) abstractmethod async

Get user details by user ID.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_user_by_id(self, user_id: str) -> KeycloakUserType | None:
    """Get user details by user ID."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_user_by_username(username) abstractmethod async

Get user details by username.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_user_by_username(self, username: str) -> KeycloakUserType | None:
    """Get user details by username."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_user_by_email(email) abstractmethod async

Get user details by email.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_user_by_email(self, email: str) -> KeycloakUserType | None:
    """Get user details by email."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.create_user(user_data) abstractmethod async

Create a new user in Keycloak.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def create_user(self, user_data: dict[str, Any]) -> str:
    """Create a new user in Keycloak."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.update_user(user_id, user_data) abstractmethod async

Update user details.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def update_user(self, user_id: str, user_data: dict[str, Any]) -> None:
    """Update user details."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.reset_password(user_id, password, temporary=False) abstractmethod async

Reset a user's password.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def reset_password(self, user_id: str, password: str, temporary: bool = False) -> None:
    """Reset a user's password."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.search_users(query, max_results=100) abstractmethod async

Search for users by username, email, or name.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def search_users(self, query: str, max_results: int = 100) -> list[KeycloakUserType]:
    """Search for users by username, email, or name."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.clear_user_sessions(user_id) abstractmethod async

Clear all sessions for a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def clear_user_sessions(self, user_id: str) -> None:
    """Clear all sessions for a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_user_roles(user_id) abstractmethod async

Get roles assigned to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_user_roles(self, user_id: str) -> list[KeycloakRoleType]:
    """Get roles assigned to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_client_roles_for_user(user_id, client_id) abstractmethod async

Get client-specific roles assigned to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_client_roles_for_user(self, user_id: str, client_id: str) -> list[KeycloakRoleType]:
    """Get client-specific roles assigned to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.has_role(token, role_name) abstractmethod async

Check if a user has a specific role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def has_role(self, token: str, role_name: str) -> bool:
    """Check if a user has a specific role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.has_any_of_roles(token, role_names) abstractmethod async

Check if a user has any of the specified roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def has_any_of_roles(self, token: str, role_names: list[str]) -> bool:
    """Check if a user has any of the specified roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.has_all_roles(token, role_names) abstractmethod async

Check if a user has all of the specified roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def has_all_roles(self, token: str, role_names: set[str]) -> bool:
    """Check if a user has all of the specified roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.assign_realm_role(user_id, role_name) abstractmethod async

Assign a realm role to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def assign_realm_role(self, user_id: str, role_name: str) -> None:
    """Assign a realm role to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.remove_realm_role(user_id, role_name) abstractmethod async

Remove a realm role from a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def remove_realm_role(self, user_id: str, role_name: str) -> None:
    """Remove a realm role from a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.assign_client_role(user_id, client_id, role_name) abstractmethod async

Assign a client-specific role to a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def assign_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Assign a client-specific role to a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.remove_client_role(user_id, client_id, role_name) abstractmethod async

Remove a client-specific role from a user.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def remove_client_role(self, user_id: str, client_id: str, role_name: str) -> None:
    """Remove a client-specific role from a user."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_realm_role(role_name) abstractmethod async

Get realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_realm_role(self, role_name: str) -> dict:
    """Get realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_realm_roles() abstractmethod async

Get all realm roles.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_realm_roles(self) -> list[dict[str, Any]]:
    """Get all realm roles."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.create_realm_role(role_name, description=None) abstractmethod async

Create a new realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def create_realm_role(self, role_name: str, description: str | None = None) -> dict[str, Any]:
    """Create a new realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.delete_realm_role(role_name) abstractmethod async

Delete a realm role.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def delete_realm_role(self, role_name: str) -> None:
    """Delete a realm role."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_client_id(client_name) abstractmethod async

Get client ID by client name.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_client_id(self, client_name: str) -> str:
    """Get client ID by client name."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_client_secret(client_id) abstractmethod async

Get client secret.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_client_secret(self, client_id: str) -> str:
    """Get client secret."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_service_account_id() abstractmethod async

Get service account user ID for the current client.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_service_account_id(self) -> str:
    """Get service account user ID for the current client."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_public_key() abstractmethod async

Get the public key used to verify tokens.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_public_key(self) -> Any:
    """Get the public key used to verify tokens."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_well_known_config() abstractmethod async

Get the well-known OpenID configuration.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_well_known_config(self) -> dict[str, Any]:
    """Get the well-known OpenID configuration."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_certs() abstractmethod async

Get the JWT verification certificates.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_certs(self) -> dict[str, Any]:
    """Get the JWT verification certificates."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.get_token_from_code(code, redirect_uri) abstractmethod async

Exchange authorization code for token.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def get_token_from_code(self, code: str, redirect_uri: str) -> KeycloakTokenType:
    """Exchange authorization code for token."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.check_permissions(token, resource, scope) abstractmethod async

Check if a user has permission to access a resource with the specified scope.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def check_permissions(self, token: str, resource: str, scope: str) -> bool:
    """Check if a user has permission to access a resource with the specified scope."""
    raise NotImplementedError

archipy.adapters.keycloak.ports.AsyncKeycloakPort.delete_user(user_id) abstractmethod async

Delete a user from Keycloak by their ID.

Source code in archipy/adapters/keycloak/ports.py
@abstractmethod
async def delete_user(self, user_id: str) -> None:
    """Delete a user from Keycloak by their ID."""
    raise NotImplementedError

ORM

Object-Relational Mapping adapters, primarily for SQLAlchemy integration.

from archipy.adapters.orm.sqlalchemy import SQLAlchemyAdapter, SQLAlchemySessionManager

# Create session manager
session_manager = SQLAlchemySessionManager(connection_string="postgresql://user:pass@localhost/db")

# Create an ORM adapter
orm_adapter = SQLAlchemyAdapter(session_manager=session_manager)

# Use the adapter
users = orm_adapter.query(User).filter(User.active == True).all()

SQLAlchemy Components

archipy.adapters.orm.sqlalchemy.adapters

archipy.adapters.orm.sqlalchemy.adapters.SqlAlchemyFilterMixin

Mixin providing filtering capabilities for SQLAlchemy queries.

This mixin provides methods to apply various filters to SQLAlchemy queries, supporting a wide range of comparison operators for different data types.

The filtering functionality supports: - Equality/inequality comparisons - Greater than/less than operations - String operations (LIKE, ILIKE, startswith, endswith) - List operations (IN, NOT IN) - NULL checks

Examples:

>>> from sqlalchemy import select
>>> from archipy.adapters.orm.sqlalchemy.adapters import SqlAlchemyFilterMixin
>>> from archipy.models.types.base_types import FilterOperationType
>>>
>>> class UserRepository(SqlAlchemyFilterMixin):
...     def find_active_users_by_name(self, name_fragment):
...         query = select(User)
...         query = self._apply_filter(
...             query,
...             User.name,
...             name_fragment,
...             FilterOperationType.ILIKE
...         )
...         query = self._apply_filter(
...             query,
...             User.is_active,
...             True,
...             FilterOperationType.EQUAL
...         )
...         return query
Source code in archipy/adapters/orm/sqlalchemy/adapters.py
class SqlAlchemyFilterMixin:
    """Mixin providing filtering capabilities for SQLAlchemy queries.

    This mixin provides methods to apply various filters to SQLAlchemy queries,
    supporting a wide range of comparison operators for different data types.

    The filtering functionality supports:
    - Equality/inequality comparisons
    - Greater than/less than operations
    - String operations (LIKE, ILIKE, startswith, endswith)
    - List operations (IN, NOT IN)
    - NULL checks

    Examples:
        >>> from sqlalchemy import select
        >>> from archipy.adapters.orm.sqlalchemy.adapters import SqlAlchemyFilterMixin
        >>> from archipy.models.types.base_types import FilterOperationType
        >>>
        >>> class UserRepository(SqlAlchemyFilterMixin):
        ...     def find_active_users_by_name(self, name_fragment):
        ...         query = select(User)
        ...         query = self._apply_filter(
        ...             query,
        ...             User.name,
        ...             name_fragment,
        ...             FilterOperationType.ILIKE
        ...         )
        ...         query = self._apply_filter(
        ...             query,
        ...             User.is_active,
        ...             True,
        ...             FilterOperationType.EQUAL
        ...         )
        ...         return query
    """

    @staticmethod
    def _apply_filter(
        query: Select | Update | Delete,
        field: InstrumentedAttribute,
        value: Any,
        operation: FilterOperationType,
    ) -> Select | Update | Delete:
        """Apply a filter to a SQLAlchemy query.

        This method applies different types of filters based on the specified
        operation type, allowing for flexible query building.

        Args:
            query: The SQLAlchemy query to apply the filter to
            field: The model attribute/column to filter on
            value: The value to compare against
            operation: The type of filter operation to apply

        Returns:
            The updated query with the filter applied

        Examples:
            >>> # Filter users with specific email domain
            >>> query = select(User)
            >>> query = SqlAlchemyFilterMixin._apply_filter(
            ...     query,
            ...     User.email,
            ...     "%@example.com",
            ...     FilterOperationType.LIKE
            ... )
            >>>
            >>> # Filter active users
            >>> query = SqlAlchemyFilterMixin._apply_filter(
            ...     query,
            ...     User.is_active,
            ...     True,
            ...     FilterOperationType.EQUAL
            ... )
            >>>
            >>> # Filter users created after a certain date
            >>> from datetime import datetime
            >>> cutoff_date = datetime(2023, 1, 1)
            >>> query = SqlAlchemyFilterMixin._apply_filter(
            ...     query,
            ...     User.created_at,
            ...     cutoff_date,
            ...     FilterOperationType.GREATER_THAN
            ... )
        """
        if value is not None or operation in [FilterOperationType.IS_NULL, FilterOperationType.IS_NOT_NULL]:
            if operation == FilterOperationType.EQUAL:
                return query.where(field == value)
            if operation == FilterOperationType.NOT_EQUAL:
                return query.where(field != value)
            if operation == FilterOperationType.LESS_THAN:
                return query.where(field < value)
            if operation == FilterOperationType.LESS_THAN_OR_EQUAL:
                return query.where(field <= value)
            if operation == FilterOperationType.GREATER_THAN:
                return query.where(field > value)
            if operation == FilterOperationType.GREATER_THAN_OR_EQUAL:
                return query.where(field >= value)
            if operation == FilterOperationType.IN_LIST:
                return query.where(field.in_(value))
            if operation == FilterOperationType.NOT_IN_LIST:
                return query.where(~field.in_(value))
            if operation == FilterOperationType.LIKE:
                return query.where(field.like(f"%{value}%"))
            if operation == FilterOperationType.ILIKE:
                return query.where(field.ilike(f"%{value}%"))
            if operation == FilterOperationType.STARTS_WITH:
                return query.where(field.startswith(value))
            if operation == FilterOperationType.ENDS_WITH:
                return query.where(field.endswith(value))
            if operation == FilterOperationType.CONTAINS:
                return query.where(field.contains(value))
            if operation == FilterOperationType.IS_NULL:
                return query.where(field.is_(None))
            if operation == FilterOperationType.IS_NOT_NULL:
                return query.where(field.isnot(None))
        return query

archipy.adapters.orm.sqlalchemy.adapters.SqlAlchemyAdapter

Bases: SqlAlchemyPort, SqlAlchemyPaginationMixin, SqlAlchemySortMixin

Database adapter for SQLAlchemy ORM operations.

This adapter provides a standardized interface for performing database operations using SQLAlchemy ORM. It implements common operations like create, read, update, delete (CRUD), along with advanced features for pagination, sorting, and filtering.

Parameters:

Name Type Description Default
orm_config SqlAlchemyConfig

Configuration for SQLAlchemy. If None, retrieves from global config. Defaults to None.

None

Examples:

>>> from archipy.adapters.orm.sqlalchemy.adapters import SqlAlchemyAdapter
>>> from archipy.models.entities import BaseEntity
>>>
>>> # Create adapter with default configuration
>>> db = SqlAlchemyAdapter()
>>>
>>> # Create a new entity
>>> user = User(name="John Doe", email="john@example.com")
>>> db.create(user)
>>>
>>> # Query with sorting and pagination
>>> from sqlalchemy import select
>>> from archipy.models.dtos.pagination_dto import PaginationDTO
>>> from archipy.models.dtos.sort_dto import SortDTO
>>>
>>> query = select(User)
>>> pagination = PaginationDTO(page=1, page_size=10)
>>> sort_info = SortDTO(column="created_at", order="DESC")
>>> results, total = db.execute_search_query(User, query, pagination, sort_info)
Source code in archipy/adapters/orm/sqlalchemy/adapters.py
class SqlAlchemyAdapter(SqlAlchemyPort, SqlAlchemyPaginationMixin, SqlAlchemySortMixin):
    """Database adapter for SQLAlchemy ORM operations.

    This adapter provides a standardized interface for performing database operations
    using SQLAlchemy ORM. It implements common operations like create, read, update,
    delete (CRUD), along with advanced features for pagination, sorting, and filtering.

    Args:
        orm_config (SqlAlchemyConfig, optional): Configuration for SQLAlchemy.
            If None, retrieves from global config. Defaults to None.

    Examples:
        >>> from archipy.adapters.orm.sqlalchemy.adapters import SqlAlchemyAdapter
        >>> from archipy.models.entities import BaseEntity
        >>>
        >>> # Create adapter with default configuration
        >>> db = SqlAlchemyAdapter()
        >>>
        >>> # Create a new entity
        >>> user = User(name="John Doe", email="john@example.com")
        >>> db.create(user)
        >>>
        >>> # Query with sorting and pagination
        >>> from sqlalchemy import select
        >>> from archipy.models.dtos.pagination_dto import PaginationDTO
        >>> from archipy.models.dtos.sort_dto import SortDTO
        >>>
        >>> query = select(User)
        >>> pagination = PaginationDTO(page=1, page_size=10)
        >>> sort_info = SortDTO(column="created_at", order="DESC")
        >>> results, total = db.execute_search_query(User, query, pagination, sort_info)
    """

    def __init__(self, orm_config: SqlAlchemyConfig | None = None) -> None:
        configs: SqlAlchemyConfig = BaseConfig.global_config().SQLALCHEMY if orm_config is None else orm_config
        self.session_manager = SessionManagerAdapter(configs)

    @override
    def execute_search_query(
        self,
        entity: type[BaseEntity],
        query: Select,
        pagination: PaginationDTO | None = None,
        sort_info: SortDTO | None = SortDTO.default(),
    ) -> tuple[list[BaseEntity], int]:
        try:
            session = self.get_session()
            sorted_query = self._apply_sorting(entity, query, sort_info)
            paginated_query = self._apply_pagination(sorted_query, pagination)

            results = session.execute(paginated_query)
            results = results.scalars().all()

            count_query = select(func.count()).select_from(query.subquery())
            total_count = session.execute(count_query).scalar_one()
            return results, total_count
        except Exception as e:
            raise RuntimeError(f"Database query failed: {e!s}") from e

    @override
    def get_session(self) -> Session:
        return self.session_manager.get_session()

    @override
    def create(self, entity: BaseEntity) -> BaseEntity | None:
        """Creates a new entity in the database.

        Args:
            entity (BaseEntity): The entity to be created.

        Returns:
            BaseEntity | None: The created entity with updated attributes
                (e.g., generated ID), or None if creation failed.

        Raises:
            InvalidEntityTypeError: If the provided entity is not a BaseEntity.

        Examples:
            >>> user = User(name="John Doe", email="john@example.com")
            >>> created_user = db.create(user)
            >>> print(created_user.id)  # UUID is now populated
        """
        if not isinstance(entity, BaseEntity):
            raise InvalidEntityTypeError(entity, BaseEntity)
        session = self.get_session()
        session.add(entity)
        session.flush()
        return entity

    @override
    def bulk_create(self, entities: list[BaseEntity]) -> list[BaseEntity] | None:
        session = self.get_session()
        session.add_all(entities)
        session.flush()
        return entities

    @override
    def get_by_uuid(self, entity_type: type, entity_uuid: UUID):
        """Retrieves an entity by its UUID.

        Args:
            entity_type (type): The entity class to query.
            entity_uuid (UUID): The UUID of the entity to retrieve.

        Returns:
            Any: The retrieved entity or None if not found.

        Raises:
            InvalidEntityTypeError: If entity_type is not a subclass of BaseEntity
                or if entity_uuid is not a UUID.

        Examples:
            >>> from uuid import UUID
            >>> user_id = UUID("550e8400-e29b-41d4-a716-446655440000")
            >>> user = db.get_by_uuid(User, user_id)
            >>> if user:
            ...     print(user.name)
        """
        if not issubclass(entity_type, BaseEntity):
            raise InvalidEntityTypeError(entity_type, BaseEntity)
        if not isinstance(entity_uuid, UUID):
            raise InvalidEntityTypeError(entity_uuid, UUID)
        session = self.get_session()
        return session.get(entity_type, entity_uuid)

    @override
    def delete(self, entity: BaseEntity) -> None:
        if not isinstance(entity, BaseEntity):
            raise InvalidEntityTypeError(entity, BaseEntity)
        session = self.get_session()
        session.delete(entity)

    @override
    def bulk_delete(self, entities: list[BaseEntity]) -> None:
        for entity in entities:
            self.delete(entity)

    @override
    def execute(self, statement: Executable, params: AnyExecuteParams | None = None):
        session = self.get_session()
        return session.execute(statement, params)

    @override
    def scalars(self, statement: Executable, params: AnyExecuteParams | None = None):
        session = self.get_session()
        return session.scalars(statement, params)

archipy.adapters.orm.sqlalchemy.adapters.SqlAlchemyAdapter.create(entity)

Creates a new entity in the database.

Parameters:

Name Type Description Default
entity BaseEntity

The entity to be created.

required

Returns:

Type Description
BaseEntity | None

BaseEntity | None: The created entity with updated attributes (e.g., generated ID), or None if creation failed.

Raises:

Type Description
InvalidEntityTypeError

If the provided entity is not a BaseEntity.

Examples:

>>> user = User(name="John Doe", email="john@example.com")
>>> created_user = db.create(user)
>>> print(created_user.id)  # UUID is now populated
Source code in archipy/adapters/orm/sqlalchemy/adapters.py
@override
def create(self, entity: BaseEntity) -> BaseEntity | None:
    """Creates a new entity in the database.

    Args:
        entity (BaseEntity): The entity to be created.

    Returns:
        BaseEntity | None: The created entity with updated attributes
            (e.g., generated ID), or None if creation failed.

    Raises:
        InvalidEntityTypeError: If the provided entity is not a BaseEntity.

    Examples:
        >>> user = User(name="John Doe", email="john@example.com")
        >>> created_user = db.create(user)
        >>> print(created_user.id)  # UUID is now populated
    """
    if not isinstance(entity, BaseEntity):
        raise InvalidEntityTypeError(entity, BaseEntity)
    session = self.get_session()
    session.add(entity)
    session.flush()
    return entity

archipy.adapters.orm.sqlalchemy.adapters.SqlAlchemyAdapter.get_by_uuid(entity_type, entity_uuid)

Retrieves an entity by its UUID.

Parameters:

Name Type Description Default
entity_type type

The entity class to query.

required
entity_uuid UUID

The UUID of the entity to retrieve.

required

Returns:

Name Type Description
Any

The retrieved entity or None if not found.

Raises:

Type Description
InvalidEntityTypeError

If entity_type is not a subclass of BaseEntity or if entity_uuid is not a UUID.

Examples:

>>> from uuid import UUID
>>> user_id = UUID("550e8400-e29b-41d4-a716-446655440000")
>>> user = db.get_by_uuid(User, user_id)
>>> if user:
...     print(user.name)
Source code in archipy/adapters/orm/sqlalchemy/adapters.py
@override
def get_by_uuid(self, entity_type: type, entity_uuid: UUID):
    """Retrieves an entity by its UUID.

    Args:
        entity_type (type): The entity class to query.
        entity_uuid (UUID): The UUID of the entity to retrieve.

    Returns:
        Any: The retrieved entity or None if not found.

    Raises:
        InvalidEntityTypeError: If entity_type is not a subclass of BaseEntity
            or if entity_uuid is not a UUID.

    Examples:
        >>> from uuid import UUID
        >>> user_id = UUID("550e8400-e29b-41d4-a716-446655440000")
        >>> user = db.get_by_uuid(User, user_id)
        >>> if user:
        ...     print(user.name)
    """
    if not issubclass(entity_type, BaseEntity):
        raise InvalidEntityTypeError(entity_type, BaseEntity)
    if not isinstance(entity_uuid, UUID):
        raise InvalidEntityTypeError(entity_uuid, UUID)
    session = self.get_session()
    return session.get(entity_type, entity_uuid)

archipy.adapters.orm.sqlalchemy.adapters.AsyncSqlAlchemyAdapter

Bases: AsyncSqlAlchemyPort, SqlAlchemyPaginationMixin, SqlAlchemySortMixin

Asynchronous database adapter for SQLAlchemy ORM operations.

This adapter provides an asynchronous interface for performing database operations using SQLAlchemy's async capabilities. It implements common operations like create, read, update, delete (CRUD), along with advanced features for pagination, sorting, and filtering.

Parameters:

Name Type Description Default
orm_config SqlAlchemyConfig

Configuration for SQLAlchemy. If None, retrieves from global config. Defaults to None.

None

Examples:

>>> from archipy.adapters.orm.sqlalchemy.adapters import AsyncSqlAlchemyAdapter
>>> from sqlalchemy import select
>>> from archipy.models.dtos.pagination_dto import PaginationDTO
>>>
>>> # Create adapter with default configuration
>>> db = AsyncSqlAlchemyAdapter()
>>>
>>> # Example async function using the adapter
>>> async def get_users():
...     query = select(User)
...     pagination = PaginationDTO(page=1, page_size=10)
...     results, total = await db.execute_search_query(User, query, pagination)
...     return results
Source code in archipy/adapters/orm/sqlalchemy/adapters.py
class AsyncSqlAlchemyAdapter(AsyncSqlAlchemyPort, SqlAlchemyPaginationMixin, SqlAlchemySortMixin):
    """Asynchronous database adapter for SQLAlchemy ORM operations.

    This adapter provides an asynchronous interface for performing database operations
    using SQLAlchemy's async capabilities. It implements common operations like
    create, read, update, delete (CRUD), along with advanced features for pagination,
    sorting, and filtering.

    Args:
        orm_config (SqlAlchemyConfig, optional): Configuration for SQLAlchemy.
            If None, retrieves from global config. Defaults to None.

    Examples:
        >>> from archipy.adapters.orm.sqlalchemy.adapters import AsyncSqlAlchemyAdapter
        >>> from sqlalchemy import select
        >>> from archipy.models.dtos.pagination_dto import PaginationDTO
        >>>
        >>> # Create adapter with default configuration
        >>> db = AsyncSqlAlchemyAdapter()
        >>>
        >>> # Example async function using the adapter
        >>> async def get_users():
        ...     query = select(User)
        ...     pagination = PaginationDTO(page=1, page_size=10)
        ...     results, total = await db.execute_search_query(User, query, pagination)
        ...     return results
    """

    def __init__(self, orm_config: SqlAlchemyConfig | None = None) -> None:
        configs: SqlAlchemyConfig = BaseConfig.global_config().SQLALCHEMY if orm_config is None else orm_config
        self.session_manager = AsyncSessionManagerAdapter(configs)

    @override
    async def execute_search_query(
        self,
        entity: type[BaseEntity],
        query: Select,
        pagination: PaginationDTO | None,
        sort_info: SortDTO | None = SortDTO.default(),
    ) -> tuple[list[BaseEntity], int]:
        """Execute a search query with pagination and sorting.

        This method executes a SELECT query with pagination and sorting applied,
        and returns both the results and the total count of matching records.

        Args:
            entity: The entity class to query
            query: The SQLAlchemy SELECT query
            pagination: Pagination settings (page number and page size)
            sort_info: Sorting information (column and direction)

        Returns:
            A tuple containing:
                - List of entities matching the query
                - Total count of matching records (ignoring pagination)

        Raises:
            RuntimeError: If the database query fails

        Examples:
            >>> async def get_active_users(page: int = 1):
            ...     query = select(User).where(User.is_active == True)
            ...     pagination = PaginationDTO(page=page, page_size=20)
            ...     sort_info = SortDTO(column="created_at", order="DESC")
            ...     users, total = await db.execute_search_query(
            ...         User, query, pagination, sort_info
            ...     )
            ...     return users, total
        """
        try:
            session = self.get_session()
            sorted_query = self._apply_sorting(entity, query, sort_info)
            paginated_query = self._apply_pagination(sorted_query, pagination)

            results = await session.execute(paginated_query)
            results = results.scalars().all()

            count_query = select(func.count()).select_from(query.subquery())
            total_count = await session.execute(count_query)
            total_count = total_count.scalar_one()
            return results, total_count
        except Exception as e:
            raise RuntimeError(f"Database query failed: {e!s}") from e

    @override
    def get_session(self) -> AsyncSession:
        return self.session_manager.get_session()

    @override
    async def create(self, entity: BaseEntity) -> BaseEntity | None:
        if not isinstance(entity, BaseEntity):
            raise InvalidEntityTypeError(entity, BaseEntity)
        session: AsyncSession = self.get_session()
        session.add(entity)
        await session.flush()
        return entity

    @override
    async def bulk_create(self, entities: list[BaseEntity]) -> list[BaseEntity] | None:
        session = self.get_session()
        session.add_all(entities)
        await session.flush()
        return entities

    @override
    async def get_by_uuid(self, entity_type: type, entity_uuid: UUID) -> Any | None:
        if not issubclass(entity_type, BaseEntity):
            raise InvalidEntityTypeError(entity_type, BaseEntity)
        if not isinstance(entity_uuid, UUID):
            raise InvalidEntityTypeError(entity_uuid, UUID)
        session = self.get_session()
        return await session.get(entity_type, entity_uuid)

    @override
    async def delete(self, entity: BaseEntity) -> None:
        if not isinstance(entity, BaseEntity):
            raise InvalidEntityTypeError(entity, BaseEntity)
        session = self.get_session()
        await session.delete(entity)

    @override
    async def bulk_delete(self, entities: list[BaseEntity]) -> None:
        for entity in entities:
            await self.delete(entity)

    @override
    async def execute(self, statement: Executable, params: AnyExecuteParams | None = None) -> Result[Any]:
        session = self.get_session()
        return await session.execute(statement, params)

    @override
    async def scalars(self, statement: Executable, params: AnyExecuteParams | None = None) -> ScalarResult[Any]:
        session = self.get_session()
        return await session.scalars(statement, params)

archipy.adapters.orm.sqlalchemy.adapters.AsyncSqlAlchemyAdapter.execute_search_query(entity, query, pagination, sort_info=SortDTO.default()) async

Execute a search query with pagination and sorting.

This method executes a SELECT query with pagination and sorting applied, and returns both the results and the total count of matching records.

Parameters:

Name Type Description Default
entity type[BaseEntity]

The entity class to query

required
query Select

The SQLAlchemy SELECT query

required
pagination PaginationDTO | None

Pagination settings (page number and page size)

required
sort_info SortDTO | None

Sorting information (column and direction)

default()

Returns:

Type Description
tuple[list[BaseEntity], int]

A tuple containing: - List of entities matching the query - Total count of matching records (ignoring pagination)

Raises:

Type Description
RuntimeError

If the database query fails

Examples:

>>> async def get_active_users(page: int = 1):
...     query = select(User).where(User.is_active == True)
...     pagination = PaginationDTO(page=page, page_size=20)
...     sort_info = SortDTO(column="created_at", order="DESC")
...     users, total = await db.execute_search_query(
...         User, query, pagination, sort_info
...     )
...     return users, total
Source code in archipy/adapters/orm/sqlalchemy/adapters.py
@override
async def execute_search_query(
    self,
    entity: type[BaseEntity],
    query: Select,
    pagination: PaginationDTO | None,
    sort_info: SortDTO | None = SortDTO.default(),
) -> tuple[list[BaseEntity], int]:
    """Execute a search query with pagination and sorting.

    This method executes a SELECT query with pagination and sorting applied,
    and returns both the results and the total count of matching records.

    Args:
        entity: The entity class to query
        query: The SQLAlchemy SELECT query
        pagination: Pagination settings (page number and page size)
        sort_info: Sorting information (column and direction)

    Returns:
        A tuple containing:
            - List of entities matching the query
            - Total count of matching records (ignoring pagination)

    Raises:
        RuntimeError: If the database query fails

    Examples:
        >>> async def get_active_users(page: int = 1):
        ...     query = select(User).where(User.is_active == True)
        ...     pagination = PaginationDTO(page=page, page_size=20)
        ...     sort_info = SortDTO(column="created_at", order="DESC")
        ...     users, total = await db.execute_search_query(
        ...         User, query, pagination, sort_info
        ...     )
        ...     return users, total
    """
    try:
        session = self.get_session()
        sorted_query = self._apply_sorting(entity, query, sort_info)
        paginated_query = self._apply_pagination(sorted_query, pagination)

        results = await session.execute(paginated_query)
        results = results.scalars().all()

        count_query = select(func.count()).select_from(query.subquery())
        total_count = await session.execute(count_query)
        total_count = total_count.scalar_one()
        return results, total_count
    except Exception as e:
        raise RuntimeError(f"Database query failed: {e!s}") from e

archipy.adapters.orm.sqlalchemy.session_manager_adapters

archipy.adapters.orm.sqlalchemy.session_manager_adapters.SessionManagerAdapter

Bases: SessionManagerPort

Manages SQLAlchemy database sessions for synchronous operations.

This adapter creates and manages database sessions using SQLAlchemy's session management system. It implements the Singleton pattern to ensure a single instance exists throughout the application lifecycle.

Parameters:

Name Type Description Default
orm_config SqlAlchemyConfig

Configuration for the ORM. If None, retrieves from global config. Defaults to None.

None

Examples:

>>> from archipy.adapters.orm.sqlalchemy.session_manager_adapters import SessionManagerAdapter
>>> from archipy.configs.config_template import SqlAlchemyConfig
>>>
>>> # Using default global configuration
>>> manager = SessionManagerAdapter()
>>> session = manager.get_session()
>>>
>>> # Using custom configuration
>>> custom_config = SqlAlchemyConfig(DATABASE="custom_db", HOST="localhost")
>>> custom_manager = SessionManagerAdapter(custom_config)
Source code in archipy/adapters/orm/sqlalchemy/session_manager_adapters.py
class SessionManagerAdapter(SessionManagerPort, metaclass=Singleton):
    """Manages SQLAlchemy database sessions for synchronous operations.

    This adapter creates and manages database sessions using SQLAlchemy's
    session management system. It implements the Singleton pattern to ensure
    a single instance exists throughout the application lifecycle.

    Args:
        orm_config (SqlAlchemyConfig, optional): Configuration for the ORM.
            If None, retrieves from global config. Defaults to None.

    Examples:
        >>> from archipy.adapters.orm.sqlalchemy.session_manager_adapters import SessionManagerAdapter
        >>> from archipy.configs.config_template import SqlAlchemyConfig
        >>>
        >>> # Using default global configuration
        >>> manager = SessionManagerAdapter()
        >>> session = manager.get_session()
        >>>
        >>> # Using custom configuration
        >>> custom_config = SqlAlchemyConfig(DATABASE="custom_db", HOST="localhost")
        >>> custom_manager = SessionManagerAdapter(custom_config)
    """

    def __init__(self, orm_config: SqlAlchemyConfig | None = None) -> None:
        configs: SqlAlchemyConfig = orm_config or BaseConfig().global_config().SQLALCHEMY
        self.engine = self._create_engine(configs)
        self._session_generator = self._get_session_generator(configs)

    @override
    def get_session(self) -> Session:
        """Retrieves a SQLAlchemy session from the session factory.

        The session is scoped to the current context to ensure thread safety.

        Returns:
            Session: A SQLAlchemy session instance that can be used for
                database operations.

        Examples:
            >>> session = session_manager.get_session()
            >>> user = session.query(User).filter_by(id=1).first()
        """
        return self._session_generator()

    @override
    def remove_session(self) -> None:
        """Removes the current session from the registry.

        This should be called when you're done with a session to prevent
        resource leaks, particularly at the end of web requests.

        Examples:
            >>> session = session_manager.get_session()
            >>> # Use session for operations
            >>> session_manager.remove_session()
        """
        self._session_generator.remove()

    def _get_session_generator(self, configs: SqlAlchemyConfig) -> scoped_session:
        session_maker = sessionmaker(self.engine)
        return scoped_session(session_maker)

    @staticmethod
    def _create_engine(configs: SqlAlchemyConfig) -> Engine:
        url = URL.create(
            drivername=configs.DRIVER_NAME,
            username=configs.USERNAME,
            password=configs.PASSWORD,
            host=configs.HOST,
            port=configs.PORT,
            database=configs.DATABASE,
        )
        return create_engine(
            url,
            isolation_level=configs.ISOLATION_LEVEL,
            echo=configs.ECHO,
            echo_pool=configs.ECHO_POOL,
            enable_from_linting=configs.ENABLE_FROM_LINTING,
            hide_parameters=configs.HIDE_PARAMETERS,
            pool_pre_ping=configs.POOL_PRE_PING,
            pool_size=configs.POOL_SIZE,
            pool_recycle=configs.POOL_RECYCLE_SECONDS,
            pool_reset_on_return=configs.POOL_RESET_ON_RETURN,
            pool_timeout=configs.POOL_TIMEOUT,
            pool_use_lifo=configs.POOL_USE_LIFO,
            query_cache_size=configs.QUERY_CACHE_SIZE,
            max_overflow=configs.POOL_MAX_OVERFLOW,
        )

archipy.adapters.orm.sqlalchemy.session_manager_adapters.SessionManagerAdapter.get_session()

Retrieves a SQLAlchemy session from the session factory.

The session is scoped to the current context to ensure thread safety.

Returns:

Name Type Description
Session Session

A SQLAlchemy session instance that can be used for database operations.

Examples:

>>> session = session_manager.get_session()
>>> user = session.query(User).filter_by(id=1).first()
Source code in archipy/adapters/orm/sqlalchemy/session_manager_adapters.py
@override
def get_session(self) -> Session:
    """Retrieves a SQLAlchemy session from the session factory.

    The session is scoped to the current context to ensure thread safety.

    Returns:
        Session: A SQLAlchemy session instance that can be used for
            database operations.

    Examples:
        >>> session = session_manager.get_session()
        >>> user = session.query(User).filter_by(id=1).first()
    """
    return self._session_generator()

archipy.adapters.orm.sqlalchemy.session_manager_adapters.SessionManagerAdapter.remove_session()

Removes the current session from the registry.

This should be called when you're done with a session to prevent resource leaks, particularly at the end of web requests.

Examples:

>>> session = session_manager.get_session()
>>> # Use session for operations
>>> session_manager.remove_session()
Source code in archipy/adapters/orm/sqlalchemy/session_manager_adapters.py
@override
def remove_session(self) -> None:
    """Removes the current session from the registry.

    This should be called when you're done with a session to prevent
    resource leaks, particularly at the end of web requests.

    Examples:
        >>> session = session_manager.get_session()
        >>> # Use session for operations
        >>> session_manager.remove_session()
    """
    self._session_generator.remove()

archipy.adapters.orm.sqlalchemy.ports

Redis

Redis integration for caching and key-value storage.

from archipy.adapters.redis import RedisAdapter, AsyncRedisAdapter

# Create a Redis adapter
redis = RedisAdapter(host="localhost", port=6379, db=0)

# Set value
redis.set("key", "value", ex=3600)  # expires in 1 hour

# Get value
value = redis.get("key")

archipy.adapters.redis.adapters

archipy.adapters.redis.adapters.RedisAdapter

Bases: RedisPort

Adapter for Redis operations providing a standardized interface.

This adapter implements the RedisPort interface to provide a consistent way to interact with Redis, abstracting the underlying Redis client implementation. It supports all common Redis operations including key-value operations, lists, sets, sorted sets, hashes, and pub/sub functionality.

The adapter maintains separate connections for read and write operations, which can be used to implement read replicas for better performance.

Parameters:

Name Type Description Default
redis_config RedisConfig

Configuration settings for Redis. If None, retrieves from global config. Defaults to None.

None

Examples:

>>> from archipy.adapters.redis.redis_adapters import RedisAdapter
>>> from archipy.configs.config_template import RedisConfig
>>>
>>> # Using global configuration
>>> redis = RedisAdapter()
>>> redis.set("key", "value", ex=60)  # Set with 60 second expiry
>>> value = redis.get("key")
>>>
>>> # Using custom configuration
>>> config = RedisConfig(MASTER_HOST="redis.example.com", PORT=6380)
>>> custom_redis = RedisAdapter(config)
Source code in archipy/adapters/redis/adapters.py
class RedisAdapter(RedisPort):
    """Adapter for Redis operations providing a standardized interface.

    This adapter implements the RedisPort interface to provide a consistent
    way to interact with Redis, abstracting the underlying Redis client
    implementation. It supports all common Redis operations including key-value
    operations, lists, sets, sorted sets, hashes, and pub/sub functionality.

    The adapter maintains separate connections for read and write operations,
    which can be used to implement read replicas for better performance.

    Args:
        redis_config (RedisConfig, optional): Configuration settings for Redis.
            If None, retrieves from global config. Defaults to None.

    Examples:
        >>> from archipy.adapters.redis.redis_adapters import RedisAdapter
        >>> from archipy.configs.config_template import RedisConfig
        >>>
        >>> # Using global configuration
        >>> redis = RedisAdapter()
        >>> redis.set("key", "value", ex=60)  # Set with 60 second expiry
        >>> value = redis.get("key")
        >>>
        >>> # Using custom configuration
        >>> config = RedisConfig(MASTER_HOST="redis.example.com", PORT=6380)
        >>> custom_redis = RedisAdapter(config)
    """

    def __init__(self, redis_config: RedisConfig | None = None) -> None:
        configs: RedisConfig = BaseConfig.global_config().REDIS if redis_config is None else redis_config
        self._set_clients(configs)

    def _set_clients(self, configs: RedisConfig) -> None:
        if redis_master_host := configs.MASTER_HOST:
            self.client: Redis = self._get_client(redis_master_host, configs)
        if redis_slave_host := configs.SLAVE_HOST:
            self.read_only_client: Redis = self._get_client(redis_slave_host, configs)
        else:
            self.read_only_client = self.client

    @staticmethod
    def _get_client(host: str, configs: RedisConfig) -> Redis:
        return Redis(
            host=host,
            port=configs.PORT,
            db=configs.DATABASE,
            password=configs.PASSWORD,
            decode_responses=configs.DECODE_RESPONSES,
            health_check_interval=configs.HEALTH_CHECK_INTERVAL,
        )

    @override
    def pttl(self, name: bytes | str) -> RedisResponseType:
        return self.read_only_client.pttl(name)

    @override
    def incrby(self, name: RedisKeyType, amount: int = 1) -> RedisResponseType:
        return self.client.incrby(name, amount)

    @override
    def set(
        self,
        name: RedisKeyType,
        value: RedisSetType,
        ex: RedisExpiryType | None = None,
        px: RedisExpiryType | None = None,
        nx: bool = False,
        xx: bool = False,
        keepttl: bool = False,
        get: bool = False,
        exat: RedisAbsExpiryType | None = None,
        pxat: RedisAbsExpiryType | None = None,
    ) -> RedisResponseType:
        return self.client.set(name, value, ex, px, nx, xx, keepttl, get, exat, pxat)

    @override
    def get(self, key: str) -> RedisResponseType:
        return self.read_only_client.get(key)

    @override
    def mget(
        self,
        keys: RedisKeyType | Iterable[RedisKeyType],
        *args: bytes | str,
    ) -> RedisResponseType:
        return self.read_only_client.mget(keys, *args)

    @override
    def mset(self, mapping: Mapping[RedisKeyType, bytes | str | float]) -> RedisResponseType:
        return self.client.mset(mapping)

    @override
    def keys(self, pattern: RedisPatternType = "*", **kwargs: Any) -> RedisResponseType:
        return self.read_only_client.keys(pattern, **kwargs)

    @override
    def getset(self, key: RedisKeyType, value: bytes | str | float) -> RedisResponseType:
        return self.client.getset(key, value)

    @override
    def getdel(self, key: bytes | str) -> RedisResponseType:
        return self.client.getdel(key)

    @override
    def exists(self, *names: bytes | str) -> RedisResponseType:
        return self.read_only_client.exists(*names)

    @override
    def delete(self, *names: bytes | str) -> RedisResponseType:
        return self.client.delete(*names)

    @override
    def append(self, key: RedisKeyType, value: bytes | str | float) -> RedisResponseType:
        return self.client.append(key, value)

    @override
    def ttl(self, name: bytes | str) -> RedisResponseType:
        return self.read_only_client.ttl(name)

    @override
    def type(self, name: bytes | str) -> RedisResponseType:
        return self.read_only_client.type(name)

    @override
    def llen(self, name: str) -> RedisIntegerResponseType:
        return self.read_only_client.llen(name)

    @override
    def lpop(self, name: str, count: int | None = None) -> Any:
        return self.client.lpop(name, count)

    @override
    def lpush(self, name: str, *values: bytes | str | float) -> RedisIntegerResponseType:
        return self.client.lpush(name, *values)

    @override
    def lrange(self, name: str, start: int, end: int) -> RedisListResponseType:
        return self.read_only_client.lrange(name, start, end)

    @override
    def lrem(self, name: str, count: int, value: str) -> RedisIntegerResponseType:
        return self.client.lrem(name, count, value)

    @override
    def lset(self, name: str, index: int, value: str) -> bool:
        return self.client.lset(name, index, value)

    @override
    def rpop(self, name: str, count: int | None = None) -> Any:
        return self.client.rpop(name, count)

    @override
    def rpush(self, name: str, *values: bytes | str | float) -> RedisIntegerResponseType:
        return self.client.rpush(name, *values)

    @override
    def scan(
        self,
        cursor: int = 0,
        match: bytes | str | None = None,
        count: int | None = None,
        _type: str | None = None,
        **kwargs: Any,
    ) -> RedisResponseType:
        return self.read_only_client.scan(cursor, match, count, _type, **kwargs)

    @override
    def scan_iter(
        self,
        match: bytes | str | None = None,
        count: int | None = None,
        _type: str | None = None,
        **kwargs: Any,
    ) -> Iterator:
        return self.read_only_client.scan_iter(match, count, _type, **kwargs)

    @override
    def sscan(
        self,
        name: RedisKeyType,
        cursor: int = 0,
        match: bytes | str | None = None,
        count: int | None = None,
    ) -> RedisResponseType:
        return self.read_only_client.sscan(name, cursor, match, count)

    @override
    def sscan_iter(
        self,
        name: RedisKeyType,
        match: bytes | str | None = None,
        count: int | None = None,
    ) -> Iterator:
        return self.read_only_client.sscan_iter(name, match, count)

    @override
    def sadd(self, name: str, *values: bytes | str | float) -> RedisIntegerResponseType:
        return self.client.sadd(name, *values)

    @override
    def scard(self, name: str) -> RedisIntegerResponseType:
        return self.client.scard(name)

    @override
    def sismember(self, name: str, value: str) -> Awaitable[bool] | bool:
        return self.read_only_client.sismember(name, value)

    @override
    def smembers(self, name: str) -> RedisSetResponseType:
        return self.read_only_client.smembers(name)

    @override
    def spop(self, name: str, count: int | None = None) -> bytes | float | int | str | list | None:
        return self.client.spop(name, count)

    @override
    def srem(self, name: str, *values: bytes | str | float) -> RedisIntegerResponseType:
        return self.client.srem(name, *values)

    @override
    def sunion(self, keys: RedisKeyType, *args: bytes | str) -> set:
        return self.client.sunion(keys, *args)

    @override
    def zadd(
        self,
        name: RedisKeyType,
        mapping: Mapping[RedisKeyType, bytes | str | float],
        nx: bool = False,
        xx: bool = False,
        ch: bool = False,
        incr: bool = False,
        gt: bool = False,
        lt: bool = False,
    ) -> RedisResponseType:
        return self.client.zadd(name, mapping, nx, xx, ch, incr, gt, lt)

    @override
    def zcard(self, name: bytes | str) -> RedisResponseType:
        return self.client.zcard(name)

    @override
    def zcount(self, name: RedisKeyType, min: float | str, max: float | str) -> RedisResponseType:
        return self.client.zcount(name, min, max)

    @override
    def zpopmax(self, name: RedisKeyType, count: int | None = None) -> RedisResponseType:
        return self.client.zpopmax(name, count)

    @override
    def zpopmin(self, name: RedisKeyType, count: int | None = None) -> RedisResponseType:
        return self.client.zpopmin(name, count)

    @override
    def zrange(
        self,
        name: RedisKeyType,
        start: int,
        end: int,
        desc: bool = False,
        withscores: bool = False,
        score_cast_func: RedisScoreCastType = float,
        byscore: bool = False,
        bylex: bool = False,
        offset: int | None = None,
        num: int | None = None,
    ) -> RedisResponseType:
        return self.read_only_client.zrange(
            name,
            start,
            end,
            desc,
            withscores,
            score_cast_func,
            byscore,
            bylex,
            offset,
            num,
        )

    @override
    def zrevrange(
        self,
        name: RedisKeyType,
        start: int,
        end: int,
        withscores: bool = False,
        score_cast_func: RedisScoreCastType = float,
    ) -> RedisResponseType:
        return self.read_only_client.zrevrange(name, start, end, withscores, score_cast_func)

    @override
    def zrangebyscore(
        self,
        name: RedisKeyType,
        min: float | str,
        max: float | str,
        start: int | None = None,
        num: int | None = None,
        withscores: bool = False,
        score_cast_func: RedisScoreCastType = float,
    ) -> RedisResponseType:
        return self.read_only_client.zrangebyscore(name, min, max, start, num, withscores, score_cast_func)

    @override
    def zrank(self, name: RedisKeyType, value: bytes | str | float) -> RedisResponseType:
        return self.read_only_client.zrank(name, value)

    @override
    def zrem(self, name: RedisKeyType, *values: bytes | str | float) -> RedisResponseType:
        return self.client.zrem(name, *values)

    @override
    def zscore(self, name: RedisKeyType, value: bytes | str | float) -> RedisResponseType:
        return self.read_only_client.zscore(name, value)

    @override
    def hdel(self, name: str, *keys: str | bytes) -> RedisIntegerResponseType:
        return self.client.hdel(name, *keys)

    @override
    def hexists(self, name: str, key: str) -> Awaitable[bool] | bool:
        return self.read_only_client.hexists(name, key)

    @override
    def hget(self, name: str, key: str) -> Awaitable[str | None] | str | None:
        return self.read_only_client.hget(name, key)

    @override
    def hgetall(self, name: str) -> Awaitable[dict] | dict:
        return self.read_only_client.hgetall(name)

    @override
    def hkeys(self, name: str) -> RedisListResponseType:
        return self.read_only_client.hkeys(name)

    @override
    def hlen(self, name: str) -> RedisIntegerResponseType:
        return self.read_only_client.hlen(name)

    @override
    def hset(
        self,
        name: str,
        key: str | bytes | None = None,
        value: str | bytes | None = None,
        mapping: dict | None = None,
        items: list | None = None,
    ) -> RedisIntegerResponseType:
        return self.client.hset(name, key, value, mapping, items)

    @override
    def hmget(self, name: str, keys: list, *args: str | bytes) -> RedisListResponseType:
        return self.read_only_client.hmget(name, keys, *args)

    @override
    def hvals(self, name: str) -> RedisListResponseType:
        return self.read_only_client.hvals(name)

    @override
    def publish(self, channel: RedisKeyType, message: bytes | str, **kwargs: Any) -> RedisResponseType:
        return self.client.publish(channel, message, **kwargs)

    @override
    def pubsub_channels(self, pattern: RedisPatternType = "*", **kwargs: Any) -> RedisResponseType:
        return self.client.pubsub_channels(pattern, **kwargs)

    @override
    def zincrby(self, name: RedisKeyType, amount: float, value: bytes | str | float) -> RedisResponseType:
        return self.client.zincrby(name, amount, value)

    @override
    def pubsub(self, **kwargs: Any) -> PubSub:
        return self.client.pubsub(**kwargs)

    @override
    def get_pipeline(self, transaction: Any = True, shard_hint: Any = None) -> Pipeline:
        return self.client.pipeline(transaction, shard_hint)

    @override
    def ping(self) -> RedisResponseType:
        return self.client.ping()

archipy.adapters.redis.ports

archipy.adapters.redis.ports.RedisPort

Interface for Redis operations providing a standardized access pattern.

This interface defines the contract for Redis adapters, ensuring consistent implementation of Redis operations across different adapters. It covers all essential Redis functionality including key-value operations, collections (lists, sets, sorted sets, hashes), and pub/sub capabilities.

Implementing classes should provide concrete implementations for all methods, typically by wrapping a Redis client library.

Examples:

>>> from archipy.adapters.redis.redis_ports import RedisPort
>>>
>>> class CustomRedisAdapter(RedisPort):
...     def __init__(self, connection_params):
...         self.client = redis.Redis(**connection_params)
...
...     def get(self, key: str) -> Any:
...         return self.client.get(key)
...
...     def set(self, name, value, ex=None, px=None, nx=False, xx=False, ...):
...         return self.client.set(name, value, ex, px, nx, xx, ...)
...
...     # Implement other required methods...
Source code in archipy/adapters/redis/ports.py
class RedisPort:
    """Interface for Redis operations providing a standardized access pattern.

    This interface defines the contract for Redis adapters, ensuring consistent
    implementation of Redis operations across different adapters. It covers all
    essential Redis functionality including key-value operations, collections
    (lists, sets, sorted sets, hashes), and pub/sub capabilities.

    Implementing classes should provide concrete implementations for all
    methods, typically by wrapping a Redis client library.

    Examples:
        >>> from archipy.adapters.redis.redis_ports import RedisPort
        >>>
        >>> class CustomRedisAdapter(RedisPort):
        ...     def __init__(self, connection_params):
        ...         self.client = redis.Redis(**connection_params)
        ...
        ...     def get(self, key: str) -> Any:
        ...         return self.client.get(key)
        ...
        ...     def set(self, name, value, ex=None, px=None, nx=False, xx=False, ...):
        ...         return self.client.set(name, value, ex, px, nx, xx, ...)
        ...
        ...     # Implement other required methods...
    """

    @abstractmethod
    def ping(self) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def pttl(self, name: bytes | str) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def incrby(self, name: RedisKeyType, amount: int = 1) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def set(
        self,
        name: RedisKeyType,
        value: RedisSetType,
        ex: RedisExpiryType | None = None,
        px: RedisExpiryType | None = None,
        nx: bool = False,
        xx: bool = False,
        keepttl: bool = False,
        get: bool = False,
        exat: RedisAbsExpiryType | None = None,
        pxat: RedisAbsExpiryType | None = None,
    ) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def get(self, key: str) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def mget(
        self,
        keys: RedisKeyType | Iterable[RedisKeyType],
        *args: bytes | str,
    ) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def mset(self, mapping: Mapping[RedisKeyType, bytes | str | float]) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def keys(self, pattern: RedisPatternType = "*", **kwargs: Any) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def getset(self, key: RedisKeyType, value: bytes | str | float) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def getdel(self, key: bytes | str) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def exists(self, *names: bytes | str) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def delete(self, *names: bytes | str) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def append(self, key: RedisKeyType, value: bytes | str | float) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def ttl(self, name: bytes | str) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def type(self, name: bytes | str) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def llen(self, name: str) -> RedisIntegerResponseType:
        raise NotImplementedError

    @abstractmethod
    def lpop(self, name: str, count: int | None = None) -> Any:
        raise NotImplementedError

    @abstractmethod
    def lpush(self, name: str, *values: bytes | str | float) -> RedisIntegerResponseType:
        raise NotImplementedError

    @abstractmethod
    def lrange(self, name: str, start: int, end: int) -> RedisListResponseType:
        raise NotImplementedError

    @abstractmethod
    def lrem(self, name: str, count: int, value: str) -> RedisIntegerResponseType:
        raise NotImplementedError

    @abstractmethod
    def lset(self, name: str, index: int, value: str) -> bool:
        raise NotImplementedError

    @abstractmethod
    def rpop(self, name: str, count: int | None = None) -> Any:
        raise NotImplementedError

    @abstractmethod
    def rpush(self, name: str, *values: bytes | str | float) -> RedisIntegerResponseType:
        raise NotImplementedError

    @abstractmethod
    def scan(
        self,
        cursor: int = 0,
        match: bytes | str | None = None,
        count: int | None = None,
        _type: str | None = None,
        **kwargs: Any,
    ) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def scan_iter(
        self,
        match: bytes | str | None = None,
        count: int | None = None,
        _type: str | None = None,
        **kwargs: Any,
    ) -> Iterator:
        raise NotImplementedError

    @abstractmethod
    def sscan(
        self,
        name: RedisKeyType,
        cursor: int = 0,
        match: bytes | str | None = None,
        count: int | None = None,
    ) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def sscan_iter(
        self,
        name: RedisKeyType,
        match: bytes | str | None = None,
        count: int | None = None,
    ) -> Iterator:
        raise NotImplementedError

    @abstractmethod
    def sadd(self, name: str, *values: bytes | str | float) -> RedisIntegerResponseType:
        raise NotImplementedError

    @abstractmethod
    def scard(self, name: str) -> RedisIntegerResponseType:
        raise NotImplementedError

    @abstractmethod
    def sismember(self, name: str, value: str) -> Awaitable[bool] | bool:
        raise NotImplementedError

    @abstractmethod
    def smembers(self, name: str) -> RedisSetResponseType:
        raise NotImplementedError

    @abstractmethod
    def spop(self, name: str, count: int | None = None) -> bytes | float | int | str | list | None:
        raise NotImplementedError

    @abstractmethod
    def srem(self, name: str, *values: bytes | str | float) -> RedisIntegerResponseType:
        raise NotImplementedError

    @abstractmethod
    def sunion(self, keys: RedisKeyType, *args: bytes | str) -> set:
        raise NotImplementedError

    @abstractmethod
    def zadd(
        self,
        name: RedisKeyType,
        mapping: Mapping[RedisKeyType, bytes | str | float],
        nx: bool = False,
        xx: bool = False,
        ch: bool = False,
        incr: bool = False,
        gt: bool = False,
        lt: bool = False,
    ) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def zcard(self, name: bytes | str) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def zcount(self, name: RedisKeyType, min: float | str, max: float | str) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def zpopmax(self, name: RedisKeyType, count: int | None = None) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def zpopmin(self, name: RedisKeyType, count: int | None = None) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def zrange(
        self,
        name: RedisKeyType,
        start: int,
        end: int,
        desc: bool = False,
        withscores: bool = False,
        score_cast_func: RedisScoreCastType = float,
        byscore: bool = False,
        bylex: bool = False,
        offset: int | None = None,
        num: int | None = None,
    ) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def zrevrange(
        self,
        name: RedisKeyType,
        start: int,
        end: int,
        withscores: bool = False,
        score_cast_func: RedisScoreCastType = float,
    ) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def zrangebyscore(
        self,
        name: RedisKeyType,
        min: float | str,
        max: float | str,
        start: int | None = None,
        num: int | None = None,
        withscores: bool = False,
        score_cast_func: RedisScoreCastType = float,
    ) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def zrank(self, name: RedisKeyType, value: bytes | str | float) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def zrem(self, name: RedisKeyType, *values: bytes | str | float) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def zscore(self, name: RedisKeyType, value: bytes | str | float) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def hdel(self, name: str, *keys: str | bytes) -> RedisIntegerResponseType:
        raise NotImplementedError

    @abstractmethod
    def hexists(self, name: str, key: str) -> Awaitable[bool] | bool:
        raise NotImplementedError

    @abstractmethod
    def hget(self, name: str, key: str) -> Awaitable[str | None] | str | None:
        raise NotImplementedError

    @abstractmethod
    def hgetall(self, name: str) -> Awaitable[dict] | dict:
        raise NotImplementedError

    @abstractmethod
    def hkeys(self, name: str) -> RedisListResponseType:
        raise NotImplementedError

    @abstractmethod
    def hlen(self, name: str) -> RedisIntegerResponseType:
        raise NotImplementedError

    @abstractmethod
    def hset(
        self,
        name: str,
        key: str | bytes | None = None,
        value: str | bytes | None = None,
        mapping: dict | None = None,
        items: list | None = None,
    ) -> RedisIntegerResponseType:
        raise NotImplementedError

    @abstractmethod
    def hmget(self, name: str, keys: list, *args: str | bytes) -> RedisListResponseType:
        raise NotImplementedError

    @abstractmethod
    def hvals(self, name: str) -> RedisListResponseType:
        raise NotImplementedError

    @abstractmethod
    def publish(self, channel: RedisKeyType, message: bytes | str, **kwargs: Any) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def pubsub_channels(self, pattern: RedisPatternType = "*", **kwargs: Any) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def zincrby(self, name: RedisKeyType, amount: float, value: bytes | str | float) -> RedisResponseType:
        raise NotImplementedError

    @abstractmethod
    def pubsub(self, **kwargs: Any) -> Any:
        raise NotImplementedError

    @abstractmethod
    def get_pipeline(self, transaction: Any = True, shard_hint: Any = None) -> Any:
        raise NotImplementedError

archipy.adapters.redis.mocks

archipy.adapters.redis.mocks.RedisMock

Bases: RedisAdapter

A Redis adapter implementation using fakeredis for testing.

Source code in archipy/adapters/redis/mocks.py
class RedisMock(RedisAdapter):
    """A Redis adapter implementation using fakeredis for testing."""

    def __init__(self, redis_config: RedisConfig | None = None) -> None:
        # Skip the parent's __init__ which would create real Redis connections
        self.client = fakeredis.FakeRedis(decode_responses=True)
        self.read_only_client = self.client

    def _set_clients(self, configs: RedisConfig) -> None:
        # Override to prevent actual connection setup
        pass

    @staticmethod
    def _get_client(host: str, configs: RedisConfig) -> Redis:
        # Override to return fakeredis instead
        return fakeredis.FakeRedis(decode_responses=configs.DECODE_RESPONSES)

archipy.adapters.redis.mocks.AsyncRedisMock

Bases: AsyncRedisAdapter

An async Redis adapter implementation using fakeredis for testing.

Source code in archipy/adapters/redis/mocks.py
class AsyncRedisMock(AsyncRedisAdapter):
    """An async Redis adapter implementation using fakeredis for testing."""

    def __init__(self, redis_config: RedisConfig | None = None) -> None:
        # Skip the parent's __init__ which would create real Redis connections
        self.client = AsyncMock()
        self.read_only_client = self.client
        self._setup_async_methods()

    def _set_clients(self, configs: RedisConfig) -> None:
        # Override to prevent actual connection setup
        pass

    @staticmethod
    def _get_client(host: str, configs: RedisConfig) -> AsyncRedis:
        # Override to return a mocked async client
        return AsyncMock()

    def _setup_async_methods(self):
        """Set up all async methods to use a synchronous fakeredis under the hood."""
        # Create a synchronous fakeredis instance to handle the actual operations
        self._fake_redis = fakeredis.FakeRedis(decode_responses=True)

        # For each async method, implement it to use the synchronous fakeredis
        for method_name in dir(AsyncRedisPort):
            if not method_name.startswith("_") and method_name != "pubsub" and method_name != "get_pipeline":
                sync_method = getattr(self._fake_redis, method_name, None)
                if sync_method and callable(sync_method):
                    async_method = self._create_async_wrapper(method_name, sync_method)
                    setattr(self.client, method_name, async_method)
                    setattr(self.read_only_client, method_name, async_method)

    def _create_async_wrapper(self, method_name, sync_method):
        """Create an async wrapper around a synchronous method."""

        async def wrapper(*args, **kwargs):
            # Remove 'self' from args when calling the sync method
            if args and args[0] is self:
                args = args[1:]
            return sync_method(*args, **kwargs)

        return wrapper