HWM Type Registry#

class etl_entities.hwm.hwm_type_registry.HWMTypeRegistry#

Registry class for HWM types

classmethod add(type_name: str, klass: type[HWM]) None#

Add mapping HWM class -> type name to registry

Parameters:
type_namestr

Type name

klasstype

HWM class

Examples

from etl_entities.hwm import HWMTypeRegistry, HWM

class MyHWM(HWM): ...

HWMTypeRegistry.add("my_hwm", MyHWM)

assert HWMTypeRegistry.get("my_hwm") == MyHWM
classmethod get(type_name: str) type[HWM]#

Get HWM class by type name

Parameters:
type_namestr

Type name

Examples

from etl_entities.hwm import HWMTypeRegistry, ColumnIntHWM, ColumnDateHWM

assert HWMTypeRegistry.get("int") == ColumnIntHWM
assert HWMTypeRegistry.get("date") == ColumnDateHWM

HWMTypeRegistry.get("unknown")  # raises KeyError
classmethod get_key(klass: type[HWM]) str#

Get HWM type name for a class

Parameters:
klassobj:type

HWM class

Examples

from etl_entities.hwm import HWMTypeRegistry, ColumnIntHWM, ColumnDateHWM

assert HWMTypeRegistry.get_key(ColumnIntHWM) == "int"
assert HWMTypeRegistry.get_key(ColumnDateHWM) == "date"

HWMTypeRegistry.get_key(UnknownHWM)  # raises KeyError
classmethod parse(inp: dict) HWM#

Parse HWM from dict representation

Returns:
resultHWM

Deserialized HWM

Examples

from etl_entities.hwm import HWMTypeRegistry, ColumnIntHWM

hwm = HWMTypeRegistry.parse(
    {
        "type": "column_int",
        "name": "some_name",
        "value": "1",
    }
)
assert hwm == ColumnIntHWM(name="some_name", value=1)

HWMTypeRegistry.parse({"type": "unknown"})  # raises KeyError