Быстрый старт
В качестве примера использования библиотек будет реализован сервис осуществляющий получение согласий пользователей на сбор, хранение и обработку персональных данных посредством OTP кода
Создание модели предметной области
Для начала создадим пакет domain
и добавим в нее переменную с именем домена, для повторного использования во всех
модулях домена:
Следующим шагом необходимо реализовать корневую сущность домена, для этого создадим модуль model.py
.
Для реализации корневой сущности в пакете d3m.domain
представлен класс RootEntity
.
from datetime import datetime
from pydantic import Field
from pydantic_extra_types.phone_numbers import PhoneNumber
from d3m.domain import RootEntity
from . import __domain_name__
class Agreement(RootEntity, domain=__domain_name__):
id_hash: str = Field(title='Хэш идентификатора клиента')
phone: PhoneNumber = Field(title='Телефон клиента для отправки OTP')
otp: str | None = Field(None, title='Хэш OTP-кода')
attempts: int = Field(default=0, title='Количество попыток сверки кода')
last_send_time: datetime| None = Field(None, title='Время последней отправки OTP кода')
confirm_code: str | None = Field(None, title='Код подтверждения полученный от пользователя')
Далее выделим атрибуты otp
, attempts
, last_send_time
в отдельную сущность.
Для реализации сущностей в пакете d3m.domain
представлен класс Entity
...
class OTP(Entity):
code_hash: str = Field(title='Хэш OTP-кода')
attempts: int = Field(default=0, title='Количество попыток сверки кода')
create_time: datetime | None = Field(title='Время последней отправки OTP кода',
default_factory=lambda: datetime.now(timezone.utc))
class Agreement(RootEntity, domain=__domain_name__):
id_hash: str = Field(title='Хэш идентификатора клиента')
phone: PhoneNumber = Field(title='Телефон клиента для отправки OTP')
otp: OTP | None = Field(None, title='OTP-код')
confirm_code: str | None = Field(None, title='Код подтверждения полученный от пользователя')
Реализуем метод создания OTP в корневой сущности:
...
from string import digits
import random
import hashlib
class OTP(Entity):
...
def check_code(self, code: str):
...
@classmethod
def build_otp_hash(cls, code: str) -> str:
return hashlib.md5(code.encode()).hexdigest()
class Agreement(RootEntity, domain=__domain_name__):
...
def create_otp(self) -> str:
code = self._generate_otp_code()
self.otp = OTP(code_hash=OTP.build_otp_hash(code))
return code
@staticmethod
def _generate_otp_code() -> str:
return ''.join(random.choice(digits) for _ in range(6))
Следующим шагом необходимо реализовать метод проверки кода. Но для начала давайте определим класс исключения которое мы будем поднимать в случае если OTP код не верный.
Для этого создадим модуль exceptions.py
в нашем пакете domain
. Ошибки предметной области представлены базовым классом
DomainError
в пакете d3m.domain
.
from d3m.domain import DomainError
from . import __domain_name__
class BaseAgreementException(DomainError, domain=__domain_name__):
pass
class MaxAttemptsError(BaseAgreementException):
__template__ = 'Исчерпано максимально количество попыток проверки кода'
class InvalidCode(BaseAgreementException):
__template__ = 'Не верный код осталось попыток {attempts}'
class OTPAlreadySendError(BaseAgreementException):
__template__ = 'OTP код уже отправлен. Повторите попытку позже.'
class OTPExpiredError(BaseAgreementException):
__template__ = 'Срок действия OTP истек.'
...
from .exceptions import MaxAttemptsError, InvalidCode
class OTP(Entity):
...
def check_code(self, code: str):
if self.attempts >= 3:
raise MaxAttemptsError()
if self.code_hash != self.build_otp_hash(code):
self.attempts += 1
raise InvalidCode(attempts=self.attempts)
...
class Agreement(RootEntity, domain=__domain_name__):
...
def check_otp(self, code: str) -> bool:
self.otp.check_code(code)
self.confirm_code = code
...
Далее добавим класс агрегата
...
import abc
...
class IAgreementAggr(abc.ABC):
@abc.abstractmethod
@property
def reference(self) -> UUID:
...
@abc.abstractmethod
async def send_otp(self):
...
@abc.abstractmethod
def confirm_otp(self, code: str):
...
class AgreementAggr(IAgreementAggr):
def __init__(self, agreement: Agreement) -> None:
self._agreement = agreement
@property
def reference(self) -> UUID:
return self._agreement.__reference__
async def send_otp(self):
pass
def confirm_otp(self, code: str):
pass
Далее добавим реализацию метода send_otp
, для этого нам потребуется определить ограничения на частоту повторной отправки SMS сообщений,
определить шаблон сообщения и подключить адаптер к сервису отправки сообщений.
...
class IMessageAdapter(abc.ABC):
@abc.abstractmethod
async def send(self, phone: PhoneNumber, message: str):
...
class AgreementAggr(IAgreementAggr):
_message_adapter: IMessageAdapter
_message_template: str
_resend_interval: timedelta
@classmethod
def bootstrap(cls, message_template: str,
message_adapter: IMessageAdapter,
resend_interval: timedelta = timedelta(seconds=60)):
cls._message_template = message_template
cls._resend_interval = resend_interval
cls._message_adapter = message_adapter
...
async def send_otp(self):
now = datetime.now(timezone.utc)
if (self._agreement.otp is not None
and (now - self._agreement.otp.create_time < self._resend_interval)):
raise OTPAlreadySendError()
code = self._agreement.create_otp()
await self._send_message(code)
async def _send_message(self, code: str):
message = self._message_template.format(code)
await self._message_adapter.send(self._agreement.phone, message)
def confirm_otp(self, code: str):
now = datetime.now(timezone.utc)
if now - self._agreement.otp.create_time > self._expiration_interval:
raise OTPExpiredError()
self._agreement.check_otp(code)
На этом проектирование предметной области завершено.
Полный листинг файлов:
import abc
import random
import hashlib
from uuid import UUID
from string import digits
from datetime import datetime, timezone, timedelta
from pydantic import Field, ConfigDict
from pydantic_extra_types.phone_numbers import PhoneNumber
from d3m.domain import RootEntity, Entity
from . import __domain_name__
from .exceptions import MaxAttemptsError, InvalidCodeError, OTPAlreadySendError, OTPExpiredError
class OTP(Entity):
code_hash: str = Field(title='Хэш OTP-кода')
attempts: int = Field(default=0, title='Количество попыток сверки кода')
create_time: datetime | None = Field(title='Время последней отправки OTP кода',
default_factory=lambda: datetime.now(timezone.utc))
model_config = ConfigDict(frozen=True)
def check_code(self, code: str):
if self.attempts >= 3:
raise MaxAttemptsError()
if self.code_hash != self.build_otp_hash(code):
self.attempts += 1
raise InvalidCodeError(attempts=self.attempts)
@classmethod
def build_otp_hash(cls, code: str) -> str:
return hashlib.md5(code.encode()).hexdigest()
class Agreement(RootEntity, domain=__domain_name__):
id_hash: str = Field(title='Хэш идентификатора клиента')
phone: PhoneNumber = Field(title='Телефон клиента для отправки OTP')
otp: OTP | None = Field(None, title='OTP-код')
confirm_code: str | None = Field(None, title='Код подтверждения полученный от пользователя')
def create_otp(self) -> str:
code = self._generate_otp_code()
self.otp = OTP(code_hash=OTP.build_otp_hash(code))
return code
@staticmethod
def _generate_otp_code() -> str:
return ''.join(random.choice(digits) for _ in range(6))
def check_otp(self, code: str) -> bool:
self.otp.check_code(code)
class IAgreementAggr(abc.ABC):
@abc.abstractmethod
@property
def reference(self) -> UUID:
...
@abc.abstractmethod
async def send_otp(self):
...
@abc.abstractmethod
def confirm_otp(self, code: str) -> None:
...
class IMessageAdapter(abc.ABC):
@abc.abstractmethod
async def send(self, phone: PhoneNumber, message: str):
...
class AgreementAggr(IAgreementAggr):
_message_adapter: IMessageAdapter
_message_template: str
_resend_interval: timedelta
_expiration_interval: timedelta
def __init__(self, agreement: Agreement) -> None:
self._agreement = agreement
@classmethod
def bootstrap(cls, message_template: str,
message_adapter: IMessageAdapter,
resend_interval: timedelta = timedelta(seconds=60),
expiration_interval: timedelta = timedelta(minutes=15),
):
cls._message_template = message_template
cls._resend_interval = resend_interval
cls._message_adapter = message_adapter
cls._expiration_interval = expiration_interval
@property
def reference(self) -> UUID:
return self._agreement.__reference__
async def send_otp(self):
now = datetime.now(timezone.utc)
if (self._agreement.otp is not None
and (now - self._agreement.otp.create_time < self._resend_interval)):
raise OTPAlreadySendError()
code = self._agreement.create_otp()
await self._send_message(code)
async def _send_message(self, code: str):
message = self._message_template.format(code)
await self._message_adapter.send(self._agreement.phone, message)
def confirm_otp(self, code: str):
now = datetime.now(timezone.utc)
if now - self._agreement.otp.create_time > self._expiration_interval:
raise OTPExpiredError()
self._agreement.check_otp(code)
from d3m.domain import DomainError
from . import __domain_name__
class BaseAgreementException(DomainError, domain=__domain_name__):
pass
class MaxAttemptsError(BaseAgreementException):
__template__ = 'Исчерпано максимально количество попыток проверки кода'
class InvalidCodeError(BaseAgreementException):
__template__ = 'Не верный код осталось попыток {attempts}'
class OTPAlreadySendError(BaseAgreementException):
__template__ = 'OTP код уже отправлен. Повторите попытку позже.'
class OTPExpiredError(BaseAgreementException):
__template__ = 'Срок действия OTP истек.'
Реализация сервисного слоя
Для взаимодействия с сервисным слоем нам необходимо определить команды
посредством которых будет осуществлять вызов соответствующих службы сервисного слоя.
Для этого созданим файл domain/commands.py
.
Для создания команд домена в пакете d3m.core
реализован базовый класс DomainCommand
.
from uuid import UUID
from pydantic import Field
from pydantic_extra_types.phone_numbers import PhoneNumber
from d3m.domain import DomainCommand
from . import __domain_name__
class BaseAgreementCommand(DomainCommand, domain=__domain_name__):
pass
class CreateAgreement(BaseAgreementCommand):
"""
Создание объекта соглашения
"""
id_hash: str = Field(title='Хэш идентификатора клиента')
phone: PhoneNumber = Field(title='Телефон клиента')
class ConfirmAgreement(BaseAgreementCommand):
"""
Подтверждение соглашения
"""
reference: UUID = Field(title='Идентификатор согласия')
code: str = Field(title='Код подтверждения')
class SendOTP(BaseAgreementCommand):
"""
Отправка OTP кода
"""
reference: UUID = Field(title='Идентификатор согласия')
from uuid import UUID
from pydantic import Field
from d3m.domain import DomainEvent
from . import __domain_name__
class BaseAgreementEvent(DomainEvent, domain=__domain_name__):
pass
class AgreementCreated(BaseAgreementEvent):
reference: UUID = Field(title='Идентификатор согласия')
Для инициализации события добавим в Агрегат метод create
...
class AgreementAggr(IAgreementAggr):
...
@classmethod
def create(cls, agreement: Agreement) -> IAgreementAggr:
agreement.create_event('AgreementCreated', reference=agreement.__reference__)
return cls(agreement)
...
Следующим шагом опишем службы сервисного слоя. Для этого создадим файл domain/usecases.py
.
from uuid import UUID
from d3m.hc import HandlersCollection
from d3m.uow import UnitOfWorkBuilder
from .commands import CreateAgreement, ConfirmAgreement, SendOTP
from .aggregate import IAgreementAggr
from .exceptions import InvalidCodeError
class IAgreementRepository:
def create(self, id_hash: str, phone: PhoneNumber) -> IAgreementAggr:
...
async def get(self, reference: UUID) -> IAgreementAggr:
...
collection = HandlersCollection()
@collection.register
async def create_agreement(cmd: CreateAgreement,
uow_builder: UnitOfWorkBuilder[IAgreementRepository]):
async with uow_builder() as uow:
agreement = uow.repository.create(cmd.id_hash, cmd.phone)
await uow.apply()
return agreement.reference
@collection.register
async def confirm_agreement(cmd: ConfirmAgreement,
uow_builder: UnitOfWorkBuilder[IAgreementRepository]):
async with uow_builder() as uow:
agreement = await uow.repository.get(cmd.reference)
try:
agreement.confirm_otp(cmd.otp)
await uow.apply()
except InvalidCodeError:
await uow.apply()
raise
@collection.subscribe('clients.agreement.AgreementCreated')
@collection.register
async def send_otp(cmd: SendOTP,
uow_builder: UnitOfWorkBuilder[IAgreementRepository]):
async with uow_builder() as uow:
agreement = await uow.repository.get(cmd.reference)
await agreement.send_otp()
await uow.apply()
Класс d3m.hc.HandlersCollection
осуществляет регистрацию всех обработчиков команд и
в последующем используется для маршрутизации команд к соответствующим службам посредством
d3m.messgebus.Messagebus
.
Обработчик команды SendOTP
дополнительно подписывается на событие поднятое при создании соглашения для инициализации отправки OTP кода.
Реализация постоянного хранилища данных
Пакетом d3m.uow
поставляется реализация классов UnitOfWorkBuilder
, UnitOfWorkCtxMgr
, UnitOfWork
,
а также интерфейсы классов IRepository
, IRepositoryBuilder
, ILocker
.
Для начала необходимо реализовать базовый класс реализующий интерфейс IRepository
и класс RepositoryBuilder
import abc
from typing import Generic, TypeVar, Any
from d3m.core import get_running_messagebus
from d3m.uow import IRepository, IRepositoryBuilder, IUnitOfWorkCtxMgr
from d3m.domain import RootEntity
T = TypeVar('T', bound=RootEntity)
class BaseRepository(IRepository, Generic[T], abc.ABC):
def __init__(self, engine):
self._engine = engine
self._insert_seen: dict[Any, T] = {}
self._update_seen: dict[Any, T] = {}
async def commit(self) -> None:
new_updated_seen = {}
async with self._engine.begin() as connection:
for reference, entity in self._insert_seen.items():
await self._insert_entity(entity, connection)
new_updated_seen[reference] = entity
for reference, entity in self._insert_seen.items():
await self._update_entity(entity, connection)
new_updated_seen[reference] = entity
self._publish_event(*new_updated_seen.values())
self._update_seen = new_updated_seen
self._insert_seen.clear()
@staticmethod
def _publish_event(*entities: T):
messagebus = get_running_messagebus()
for entity in entities:
for event in entity.collecte_events():
_ = messagebus.handle_message(event)
@abc.abstractmethod
async def _insert_entity(self, entity: T, connection):
...
@abc.abstractmethod
async def _update_entity(self, entity: T, connection):
...
class RepositoryBuilder(IRepositoryBuilder):
def __init__(self, repository_class: type[BaseRepository], engine):
self._repository_class = repository_class
self._engine = engine
async def __call__(self, __uow_context_manager: IUnitOfWorkCtxMgr, /) -> IRepository:
return self._repository_class(engine=self._engine)
Следующим шагом необходимо добавить реализацию репозитория класса Agreement
:
...
class AgreementRepository(IAgreementRepository, BaseRepository[Agreement]):
def create(self, id_hash: str, phone: PhoneNumber) -> IAgreementAggr:
agreement = Agreement(id_hash, phone)
self._insert_seen[agreement.__reference__] = agreement
return AgreementAggr.create(agreement)
async def get(self, reference: UUID) -> IAgreementAggr:
if reference in self._insert_seen:
return AgreementAggr(self._insert_seen[reference])
elif reference in self._update_entity:
return AgreementAggr(self._update_seen[reference])
async with self._engine.begin() as connection:
agreement = await self._get_agreement(reference, connection)
self._update_seen[reference] = agreement
return AgreementAggr(agreement)
async def _insert_entity(self, entity: T, connection):
...
async def _update_entity(self, entity: T, connection):
...
async def _get_agreement(self, reference, connection) -> T:
...
Для завершения реализации класса репозитория необходимо описать таблицы и добавить имплементации методов
_insert_entity
, _update_entity
, _get_agreement
.
Реализация API слоя
from uuid import UUID
from fastapi import FastAPI, APIRouter, Request
from pydantic import BaseModel, Field
from d3m.core import get_running_messagebus
from d3m.messagebus import UniversalMessage
app = FastAPI()
router = APIRouter(prefix='/v1/agreement')
class CreateAgreementResponse(BaseModel):
reference: UUID
@router.post('/')
async def create_agreement(request: Request) -> CreateAgreementResponse:
mb = get_running_messagebus()
cmd = UniversalMessage('clients.agreement.CreateAgreement', 'COMMAND', await request.json())
reference = await mb.handle_message(cmd)
return CreateAgreementResponse(reference=reference)
class ConfirmAgreementRequest(BaseModel):
code: str = Field(title='Код подтверждения')
@router.post('/{reference}')
async def confirm_agreement(body: ConfirmAgreementRequest, reference: UUID) -> BaseModel:
mb = get_running_messagebus()
cmd = UniversalMessage('clients.agreement.ConfirmAgreement', 'COMMAND', {'reference': reference, 'code': body.code})
await mb.handle_message(cmd)
return BaseModel()
@router.post('/{reference}/resend')
async def resend_agreement(reference: UUID) -> BaseModel:
mb = get_running_messagebus()
cmd = UniversalMessage('clients.agreement.SendOTP', 'COMMAND', {'reference': reference})
await mb.handle_message(cmd)
return BaseModel()
Сборка проекта
import asyncio
import signal
from contextlib import asynccontextmanager
from pydantic_settings import BaseSettings, SettingsConfigDict
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from d3m.uow import UnitOfWorkBuilder
from d3m.core import get_messagebus, set_messagebus
from d3m.messagebus import Messagebus
import uvicorn
from .domain import __domain_name__
from .repository import AgreementRepositoryBuilder, AgreementRepository
from .usecases import collection
from .gateway import router
class Settings(BaseSettings):
pg_url: str
api_host: str = '0.0.0.0'
api_port: str = 80
__setting = Settings()
def setup_uow_builder():
engine = create_async_engine(__setting.pg_url)
repo_builder = AgreementRepositoryBuilder(AgreementRepository, engine)
uow_builder = UnitOfWorkBuilder(repo_builder)
get_messagebus().set_defaults(__domain_name__, uow_builder=uow_builder)
@asynccontextmanager
async def setup_fastapi_app(_):
app = FastAPI()
app.include_router(router)
config = uvicorn.Config(
app=app,
host=__setting.api_host,
port=__setting.api_port,
)
server = uvicorn.Server(config)
server.config.load()
server.lifespan = server.config.lifespan_class(server.config)
await server.startup()
yield
await server.shutdown()
def setup_messagebus():
messagebus = Messagebus(lifespan=setup_fastapi_app)
messagebus.include_collection(collection)
set_messagebus(messagebus)
def bootstrap():
setup_messagebus()
setup_uow_builder()
def entrypoint():
bootstrap()
messagebus = get_messagebus()
loop = asyncio.get_event_loop()
def stop():
task = loop.create_task(messagebus.close())
task.add_done_callback(lambda x: loop.close())
loop.add_signal_handler(signal.SIGTERM, stop)
loop.add_signal_handler(signal.SIGINT, stop)
loop.add_signal_handler(signal.SIGKILL, stop)
loop.run_until_complete(messagebus.run())
loop.run_forever()
if __name__ == '__main__':
entrypoint()