Integer HWM#
- class etl_entities.hwm.hwm_type_registry.HWMTypeRegistry#
Registry class for HWM types
Methods
add
(type_name, klass)Add mapping
HWM class
->type name
to registryget
(type_name)Get HWM class by type name
get_key
(klass)Get HWM type name for a class
parse
(inp)Parse HWM from dict representation
- classmethod add(type_name: str, klass: type[HWM]) None #
Add mapping
HWM class
->type name
to registry- Parameters:
- type_name
str
Type name
- klass
type
HWM class
- type_name
Examples
from etl_entities import HWMTypeRegistry, HWM class MyHWM(HWM): ... HWMTypeRegistry.add("somename", MyHWM) HWMTypeRegistry.add("anothername", MyHWM) # multiple type names are allowed assert HWMTypeRegistry.get("somename") == MyHWM assert HWMTypeRegistry.get("anothername") == MyHWM
- classmethod get(type_name: str) type[HWM] #
Get HWM class by type name
- Parameters:
- type_namestr
Type name
Examples
from etl_entities import HWMTypeRegistry, IntHWM, FloatHWM assert HWMTypeRegistry.get("int") == IntHWM assert HWMTypeRegistry.get("integer") == IntHWM # multiple type names are supported assert HWMTypeRegistry.get("float") == FloatHWM HWMTypeRegistry.get("unknown") # raises KeyError
- classmethod get_key(klass: type[HWM]) str #
Get HWM type name for a class
- Parameters:
- klassobj:type
HWM class
Examples
from etl_entities import HWMTypeRegistry, IntHWM, FloatHWM assert HWMTypeRegistry.get_key(IntHWM) == "int" # only first type name is returned assert HWMTypeRegistry.get_key(FloatHWM) == "float" HWMTypeRegistry.get_key(UnknownHWM) # raises KeyError
- classmethod parse(inp: dict) HWM #
Parse HWM from dict representation
- Returns:
- resultHWM
Deserialized HWM
Examples
from etl_entities import HWMTypeRegistry, IntHWM assert HWMTypeRegistry.parse( { "value": "1", "type": "int", "column": {"name": ..., "partition": ...}, "source": ..., "process": ..., } ) == IntHWM(value=1, ...) HWMTypeRegistry.parse({"type": "unknown"}) # raises KeyError