mypydantic.create_models

  1# pylint: disable=protected-access,line-too-long
  2import importlib
  3import os
  4import re
  5from datetime import datetime
  6from typing import Dict, Literal, Optional
  7
  8import jinja2
  9from mypy_boto3 import submodules
 10from mypy_boto3_wafv2 import type_defs
 11
 12from mypydantic.constants import BASEMODEL_NAME_CONFLICTS, TEMPLATE_DIR, TYPE_CASTS
 13from mypydantic.helpers.logger import CustomLogger
 14from mypydantic.helpers.parsers import snake_case
 15
 16LOG = CustomLogger()
 17LOG.local = True
 18
 19
 20def generate_model_file(
 21    service: str, model: str, is_test: Optional[bool] = False
 22) -> None:
 23    base_model_path = f"{os.getcwd()}/mypydantic/models/"
 24    model_path = (
 25        base_model_path + f"{service}.py"
 26        if not is_test
 27        else base_model_path + f"{service}_test.py"
 28    )
 29    template = get_base_template()
 30    file_content = template.render(current_date=datetime.now().ctime(), model=model)
 31    with open(model_path, "w", encoding="utf-8") as write_file:
 32        write_file.write(file_content)
 33    LOG.info(f"Model Generated: {service}")
 34
 35
 36def get_base_template() -> jinja2.Template:
 37    template_loader = jinja2.FileSystemLoader(TEMPLATE_DIR)
 38    template_env = jinja2.Environment(loader=template_loader)
 39    config_template = "base.jinja"
 40    return template_env.get_template(config_template)
 41
 42
 43def check_attributes(
 44    object: object,
 45) -> Dict[
 46    Literal[
 47        "has_name",
 48        "has_annotations",
 49        "has_origin",
 50        "has_origin_name",
 51        "has_forward_arg",
 52    ],
 53    bool,
 54]:
 55    return {
 56        "has_name": hasattr(object, "__name__"),
 57        "has_annotations": hasattr(object, "__annotations__"),
 58        "has_origin": hasattr(object, "__origin__"),
 59        "has_origin_name": hasattr(object, "_name"),
 60        "has_forward_arg": hasattr(object, "__forward_arg__"),
 61    }
 62
 63
 64def get_type(model_type: object):
 65    attributes = check_attributes(model_type)
 66    if attributes["has_name"]:
 67        return model_type.__name__
 68    if attributes["has_annotations"]:
 69        return get_type(model_type.__annotations__)
 70    if attributes["has_origin"]:
 71        try:
 72            model_name = model_type._name
 73            if model_name is None:
 74                model_attributes = check_attributes(model_type)
 75                if model_attributes["has_origin"]:
 76                    origin_attributes = check_attributes(model_type.__origin__)
 77                    if origin_attributes["has_origin_name"]:
 78                        model_name = model_type.__origin__._name
 79                    else:
 80                        model_name = model_type.__origin__.__name__
 81            arg_list = []
 82            args = model_type.__args__
 83            for arg in args:
 84                arg_type = get_type(arg)
 85                arg_list.append(arg_type)
 86            arg_string = str(arg_list).replace("'", "")
 87            return f"{model_name}{arg_string}"
 88        except:
 89            LOG.info(model_type.__dict__)
 90            raise
 91    if attributes["has_origin_name"]:
 92        return f"{model_type._name}"
 93    if attributes["has_forward_arg"]:
 94        return f"{model_type.__forward_arg__}"
 95    return f'"{model_type}"'
 96
 97
 98def build_model_properties(
 99    build_model_template: str,
100    annotations: dict,
101    opitonal_args: list,
102) -> str:
103    for model_property, model_type in annotations.items():
104        property_type = get_type(model_type)
105        for type_cast in TYPE_CASTS:
106            pattern = rf"(\b{type_cast}\b)((?:\[[^\]]*\])?)"
107            replacement = r"Type[\g<1>\g<2>]"
108            property_type = re.sub(pattern, replacement, property_type)
109        is_optional = False
110        model_alias = model_property
111        if str(model_property) in opitonal_args:
112            is_optional = True
113        if str(model_property).lower() in BASEMODEL_NAME_CONFLICTS:
114            model_property = f"{snake_case(str(model_property))}_"
115        else:
116            model_property = f"{snake_case(str(model_property))}"
117        if is_optional:
118            property_model = f'\t{model_property}: Optional[{property_type}] = Field(default=None,alias="{model_alias}")\n'
119        else:
120            property_model = (
121                f'\t{model_property}: {property_type} = Field(alias="{model_alias}")\n'
122            )
123        build_model_template += property_model
124    return build_model_template
125
126
127def build_model(service: str, type_defs: type_defs):
128    LOG.info(f"Building Models for: {service}")
129    build_model_template = "\n"
130    for type_def in type_defs.__all__:
131        type_def_model = getattr(type_defs, type_def)
132        annotations = type_def_model.__annotations__
133        opitonal_args = list(getattr(type_def_model, "__optional_keys__"))
134        build_model_template += f"\nclass {type_def_model.__name__}(BaseModel):\n"
135        build_model_template = build_model_properties(
136            build_model_template, annotations, opitonal_args
137        )
138    model = (
139        build_model_template.replace("TypeDef", "Model")
140        .replace("RequestRequest", "Request")
141        .replace("NoneType", "None")
142        .replace('"Ellipsis"', "...")
143    )
144    LOG.debug(model)
145    generate_model_file(service, model, is_test=False)
146
147
148def main():
149    for sub in submodules.SUBMODULES:
150        service = sub.import_name.rstrip("_")
151        module = importlib.import_module(sub.module_name)
152        type_defs = getattr(module, "type_defs")
153        # if service == "glue":
154        build_model(service, type_defs)
155
156
157if __name__ == "__main__":
158    main()
def generate_model_file(service: str, model: str, is_test: Optional[bool] = False) -> None:
21def generate_model_file(
22    service: str, model: str, is_test: Optional[bool] = False
23) -> None:
24    base_model_path = f"{os.getcwd()}/mypydantic/models/"
25    model_path = (
26        base_model_path + f"{service}.py"
27        if not is_test
28        else base_model_path + f"{service}_test.py"
29    )
30    template = get_base_template()
31    file_content = template.render(current_date=datetime.now().ctime(), model=model)
32    with open(model_path, "w", encoding="utf-8") as write_file:
33        write_file.write(file_content)
34    LOG.info(f"Model Generated: {service}")
def get_base_template() -> jinja2.environment.Template:
37def get_base_template() -> jinja2.Template:
38    template_loader = jinja2.FileSystemLoader(TEMPLATE_DIR)
39    template_env = jinja2.Environment(loader=template_loader)
40    config_template = "base.jinja"
41    return template_env.get_template(config_template)
def check_attributes( object: object) -> Dict[Literal['has_name', 'has_annotations', 'has_origin', 'has_origin_name', 'has_forward_arg'], bool]:
44def check_attributes(
45    object: object,
46) -> Dict[
47    Literal[
48        "has_name",
49        "has_annotations",
50        "has_origin",
51        "has_origin_name",
52        "has_forward_arg",
53    ],
54    bool,
55]:
56    return {
57        "has_name": hasattr(object, "__name__"),
58        "has_annotations": hasattr(object, "__annotations__"),
59        "has_origin": hasattr(object, "__origin__"),
60        "has_origin_name": hasattr(object, "_name"),
61        "has_forward_arg": hasattr(object, "__forward_arg__"),
62    }
def get_type(model_type: object):
65def get_type(model_type: object):
66    attributes = check_attributes(model_type)
67    if attributes["has_name"]:
68        return model_type.__name__
69    if attributes["has_annotations"]:
70        return get_type(model_type.__annotations__)
71    if attributes["has_origin"]:
72        try:
73            model_name = model_type._name
74            if model_name is None:
75                model_attributes = check_attributes(model_type)
76                if model_attributes["has_origin"]:
77                    origin_attributes = check_attributes(model_type.__origin__)
78                    if origin_attributes["has_origin_name"]:
79                        model_name = model_type.__origin__._name
80                    else:
81                        model_name = model_type.__origin__.__name__
82            arg_list = []
83            args = model_type.__args__
84            for arg in args:
85                arg_type = get_type(arg)
86                arg_list.append(arg_type)
87            arg_string = str(arg_list).replace("'", "")
88            return f"{model_name}{arg_string}"
89        except:
90            LOG.info(model_type.__dict__)
91            raise
92    if attributes["has_origin_name"]:
93        return f"{model_type._name}"
94    if attributes["has_forward_arg"]:
95        return f"{model_type.__forward_arg__}"
96    return f'"{model_type}"'
def build_model_properties(build_model_template: str, annotations: dict, opitonal_args: list) -> str:
 99def build_model_properties(
100    build_model_template: str,
101    annotations: dict,
102    opitonal_args: list,
103) -> str:
104    for model_property, model_type in annotations.items():
105        property_type = get_type(model_type)
106        for type_cast in TYPE_CASTS:
107            pattern = rf"(\b{type_cast}\b)((?:\[[^\]]*\])?)"
108            replacement = r"Type[\g<1>\g<2>]"
109            property_type = re.sub(pattern, replacement, property_type)
110        is_optional = False
111        model_alias = model_property
112        if str(model_property) in opitonal_args:
113            is_optional = True
114        if str(model_property).lower() in BASEMODEL_NAME_CONFLICTS:
115            model_property = f"{snake_case(str(model_property))}_"
116        else:
117            model_property = f"{snake_case(str(model_property))}"
118        if is_optional:
119            property_model = f'\t{model_property}: Optional[{property_type}] = Field(default=None,alias="{model_alias}")\n'
120        else:
121            property_model = (
122                f'\t{model_property}: {property_type} = Field(alias="{model_alias}")\n'
123            )
124        build_model_template += property_model
125    return build_model_template
def build_model( service: str, type_defs: <module 'mypy_boto3_wafv2.type_defs' from '/Users/jongreg/.local/share/virtualenvs/mypydantic-TQc7TkDd/lib/python3.9/site-packages/mypy_boto3_wafv2/type_defs.py'>):
128def build_model(service: str, type_defs: type_defs):
129    LOG.info(f"Building Models for: {service}")
130    build_model_template = "\n"
131    for type_def in type_defs.__all__:
132        type_def_model = getattr(type_defs, type_def)
133        annotations = type_def_model.__annotations__
134        opitonal_args = list(getattr(type_def_model, "__optional_keys__"))
135        build_model_template += f"\nclass {type_def_model.__name__}(BaseModel):\n"
136        build_model_template = build_model_properties(
137            build_model_template, annotations, opitonal_args
138        )
139    model = (
140        build_model_template.replace("TypeDef", "Model")
141        .replace("RequestRequest", "Request")
142        .replace("NoneType", "None")
143        .replace('"Ellipsis"', "...")
144    )
145    LOG.debug(model)
146    generate_model_file(service, model, is_test=False)
def main():
149def main():
150    for sub in submodules.SUBMODULES:
151        service = sub.import_name.rstrip("_")
152        module = importlib.import_module(sub.module_name)
153        type_defs = getattr(module, "type_defs")
154        # if service == "glue":
155        build_model(service, type_defs)