mypydantic.create_models
1# pylint: disable=protected-access,line-too-long 2import importlib 3import os 4import re 5from datetime import datetime 6from typing import Dict, Literal, Optional 7 8import jinja2 9from mypy_boto3 import submodules 10from mypy_boto3_wafv2 import type_defs 11 12from mypydantic.constants import BASEMODEL_NAME_CONFLICTS, TEMPLATE_DIR, TYPE_CASTS 13from mypydantic.helpers.logger import CustomLogger 14from mypydantic.helpers.parsers import snake_case 15 16LOG = CustomLogger() 17LOG.local = True 18 19 20def generate_model_file( 21 service: str, model: str, is_test: Optional[bool] = False 22) -> None: 23 base_model_path = f"{os.getcwd()}/mypydantic/models/" 24 model_path = ( 25 base_model_path + f"{service}.py" 26 if not is_test 27 else base_model_path + f"{service}_test.py" 28 ) 29 template = get_base_template() 30 file_content = template.render(current_date=datetime.now().ctime(), model=model) 31 with open(model_path, "w", encoding="utf-8") as write_file: 32 write_file.write(file_content) 33 LOG.info(f"Model Generated: {service}") 34 35 36def get_base_template() -> jinja2.Template: 37 template_loader = jinja2.FileSystemLoader(TEMPLATE_DIR) 38 template_env = jinja2.Environment(loader=template_loader) 39 config_template = "base.jinja" 40 return template_env.get_template(config_template) 41 42 43def check_attributes( 44 object: object, 45) -> Dict[ 46 Literal[ 47 "has_name", 48 "has_annotations", 49 "has_origin", 50 "has_origin_name", 51 "has_forward_arg", 52 ], 53 bool, 54]: 55 return { 56 "has_name": hasattr(object, "__name__"), 57 "has_annotations": hasattr(object, "__annotations__"), 58 "has_origin": hasattr(object, "__origin__"), 59 "has_origin_name": hasattr(object, "_name"), 60 "has_forward_arg": hasattr(object, "__forward_arg__"), 61 } 62 63 64def get_type(model_type: object): 65 attributes = check_attributes(model_type) 66 if attributes["has_name"]: 67 return model_type.__name__ 68 if attributes["has_annotations"]: 69 return get_type(model_type.__annotations__) 70 if attributes["has_origin"]: 71 try: 72 model_name = model_type._name 73 if model_name is None: 74 model_attributes = check_attributes(model_type) 75 if model_attributes["has_origin"]: 76 origin_attributes = check_attributes(model_type.__origin__) 77 if origin_attributes["has_origin_name"]: 78 model_name = model_type.__origin__._name 79 else: 80 model_name = model_type.__origin__.__name__ 81 arg_list = [] 82 args = model_type.__args__ 83 for arg in args: 84 arg_type = get_type(arg) 85 arg_list.append(arg_type) 86 arg_string = str(arg_list).replace("'", "") 87 return f"{model_name}{arg_string}" 88 except: 89 LOG.info(model_type.__dict__) 90 raise 91 if attributes["has_origin_name"]: 92 return f"{model_type._name}" 93 if attributes["has_forward_arg"]: 94 return f"{model_type.__forward_arg__}" 95 return f'"{model_type}"' 96 97 98def build_model_properties( 99 build_model_template: str, 100 annotations: dict, 101 opitonal_args: list, 102) -> str: 103 for model_property, model_type in annotations.items(): 104 property_type = get_type(model_type) 105 for type_cast in TYPE_CASTS: 106 pattern = rf"(\b{type_cast}\b)((?:\[[^\]]*\])?)" 107 replacement = r"Type[\g<1>\g<2>]" 108 property_type = re.sub(pattern, replacement, property_type) 109 is_optional = False 110 model_alias = model_property 111 if str(model_property) in opitonal_args: 112 is_optional = True 113 if str(model_property).lower() in BASEMODEL_NAME_CONFLICTS: 114 model_property = f"{snake_case(str(model_property))}_" 115 else: 116 model_property = f"{snake_case(str(model_property))}" 117 if is_optional: 118 property_model = f'\t{model_property}: Optional[{property_type}] = Field(default=None,alias="{model_alias}")\n' 119 else: 120 property_model = ( 121 f'\t{model_property}: {property_type} = Field(alias="{model_alias}")\n' 122 ) 123 build_model_template += property_model 124 return build_model_template 125 126 127def build_model(service: str, type_defs: type_defs): 128 LOG.info(f"Building Models for: {service}") 129 build_model_template = "\n" 130 for type_def in type_defs.__all__: 131 type_def_model = getattr(type_defs, type_def) 132 annotations = type_def_model.__annotations__ 133 opitonal_args = list(getattr(type_def_model, "__optional_keys__")) 134 build_model_template += f"\nclass {type_def_model.__name__}(BaseModel):\n" 135 build_model_template = build_model_properties( 136 build_model_template, annotations, opitonal_args 137 ) 138 model = ( 139 build_model_template.replace("TypeDef", "Model") 140 .replace("RequestRequest", "Request") 141 .replace("NoneType", "None") 142 .replace('"Ellipsis"', "...") 143 ) 144 LOG.debug(model) 145 generate_model_file(service, model, is_test=False) 146 147 148def main(): 149 for sub in submodules.SUBMODULES: 150 service = sub.import_name.rstrip("_") 151 module = importlib.import_module(sub.module_name) 152 type_defs = getattr(module, "type_defs") 153 # if service == "glue": 154 build_model(service, type_defs) 155 156 157if __name__ == "__main__": 158 main()
def
generate_model_file(service: str, model: str, is_test: Optional[bool] = False) -> None:
21def generate_model_file( 22 service: str, model: str, is_test: Optional[bool] = False 23) -> None: 24 base_model_path = f"{os.getcwd()}/mypydantic/models/" 25 model_path = ( 26 base_model_path + f"{service}.py" 27 if not is_test 28 else base_model_path + f"{service}_test.py" 29 ) 30 template = get_base_template() 31 file_content = template.render(current_date=datetime.now().ctime(), model=model) 32 with open(model_path, "w", encoding="utf-8") as write_file: 33 write_file.write(file_content) 34 LOG.info(f"Model Generated: {service}")
def
get_base_template() -> jinja2.environment.Template:
def
check_attributes( object: object) -> Dict[Literal['has_name', 'has_annotations', 'has_origin', 'has_origin_name', 'has_forward_arg'], bool]:
44def check_attributes( 45 object: object, 46) -> Dict[ 47 Literal[ 48 "has_name", 49 "has_annotations", 50 "has_origin", 51 "has_origin_name", 52 "has_forward_arg", 53 ], 54 bool, 55]: 56 return { 57 "has_name": hasattr(object, "__name__"), 58 "has_annotations": hasattr(object, "__annotations__"), 59 "has_origin": hasattr(object, "__origin__"), 60 "has_origin_name": hasattr(object, "_name"), 61 "has_forward_arg": hasattr(object, "__forward_arg__"), 62 }
def
get_type(model_type: object):
65def get_type(model_type: object): 66 attributes = check_attributes(model_type) 67 if attributes["has_name"]: 68 return model_type.__name__ 69 if attributes["has_annotations"]: 70 return get_type(model_type.__annotations__) 71 if attributes["has_origin"]: 72 try: 73 model_name = model_type._name 74 if model_name is None: 75 model_attributes = check_attributes(model_type) 76 if model_attributes["has_origin"]: 77 origin_attributes = check_attributes(model_type.__origin__) 78 if origin_attributes["has_origin_name"]: 79 model_name = model_type.__origin__._name 80 else: 81 model_name = model_type.__origin__.__name__ 82 arg_list = [] 83 args = model_type.__args__ 84 for arg in args: 85 arg_type = get_type(arg) 86 arg_list.append(arg_type) 87 arg_string = str(arg_list).replace("'", "") 88 return f"{model_name}{arg_string}" 89 except: 90 LOG.info(model_type.__dict__) 91 raise 92 if attributes["has_origin_name"]: 93 return f"{model_type._name}" 94 if attributes["has_forward_arg"]: 95 return f"{model_type.__forward_arg__}" 96 return f'"{model_type}"'
def
build_model_properties(build_model_template: str, annotations: dict, opitonal_args: list) -> str:
99def build_model_properties( 100 build_model_template: str, 101 annotations: dict, 102 opitonal_args: list, 103) -> str: 104 for model_property, model_type in annotations.items(): 105 property_type = get_type(model_type) 106 for type_cast in TYPE_CASTS: 107 pattern = rf"(\b{type_cast}\b)((?:\[[^\]]*\])?)" 108 replacement = r"Type[\g<1>\g<2>]" 109 property_type = re.sub(pattern, replacement, property_type) 110 is_optional = False 111 model_alias = model_property 112 if str(model_property) in opitonal_args: 113 is_optional = True 114 if str(model_property).lower() in BASEMODEL_NAME_CONFLICTS: 115 model_property = f"{snake_case(str(model_property))}_" 116 else: 117 model_property = f"{snake_case(str(model_property))}" 118 if is_optional: 119 property_model = f'\t{model_property}: Optional[{property_type}] = Field(default=None,alias="{model_alias}")\n' 120 else: 121 property_model = ( 122 f'\t{model_property}: {property_type} = Field(alias="{model_alias}")\n' 123 ) 124 build_model_template += property_model 125 return build_model_template
def
build_model( service: str, type_defs: <module 'mypy_boto3_wafv2.type_defs' from '/Users/jongreg/.local/share/virtualenvs/mypydantic-TQc7TkDd/lib/python3.9/site-packages/mypy_boto3_wafv2/type_defs.py'>):
128def build_model(service: str, type_defs: type_defs): 129 LOG.info(f"Building Models for: {service}") 130 build_model_template = "\n" 131 for type_def in type_defs.__all__: 132 type_def_model = getattr(type_defs, type_def) 133 annotations = type_def_model.__annotations__ 134 opitonal_args = list(getattr(type_def_model, "__optional_keys__")) 135 build_model_template += f"\nclass {type_def_model.__name__}(BaseModel):\n" 136 build_model_template = build_model_properties( 137 build_model_template, annotations, opitonal_args 138 ) 139 model = ( 140 build_model_template.replace("TypeDef", "Model") 141 .replace("RequestRequest", "Request") 142 .replace("NoneType", "None") 143 .replace('"Ellipsis"', "...") 144 ) 145 LOG.debug(model) 146 generate_model_file(service, model, is_test=False)
def
main():