KeyValue Int HWM#

class etl_entities.hwm.key_value.key_value_int_hwm.KeyValueIntHWM(*, name: str, description: str = '', topic: str | None = None, value: frozendict = None, expression: Any = None, modified_time: datetime = None)#

Integer KeyValue HWM type

Parameters:
namestr

HWM unique name

valuefrozendict[Any, KeyValueHWMValueType], default: frozendict

HWM value

descriptionstr, default: ""

Description of HWM

sourceAny, default: None

HWM source, e.g. topic name

expressionAny, default: None

Expression used to generate HWM value, e.g. offset

modified_timedatetime.datetime, default: current datetime

HWM value modification time

Examples

from etl_entities.hwm import KeyValueIntHWM

hwm_kv_int = KeyValueIntHWM(
    name="long_unique_name",
    source="topic_name",
    expression="offset",
    value={
        0: 100,  # 0 and 1 - partition numbers
        1: 123,  # 100 and 123 - offset values
    },
)
copy(*, include: AbstractSetIntStr | MappingIntStrAny | None = None, exclude: AbstractSetIntStr | MappingIntStrAny | None = None, update: DictStrAny | None = None, deep: bool = False) Model#

Duplicate a model, optionally choose which fields to include, exclude and change.

Parameters:
  • include – fields to include in new model

  • exclude – fields to exclude from new model, as with values this takes precedence over include

  • update – values to change/add in the new model. Note: the data is not validated before creating the new model: you should trust this data

  • deep – set to True to make a deep copy of the model

Returns:

new model instance

dict(*, include: AbstractSetIntStr | MappingIntStrAny | None = None, exclude: AbstractSetIntStr | MappingIntStrAny | None = None, by_alias: bool = False, skip_defaults: bool | None = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False) DictStrAny#

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

json(*, include: AbstractSetIntStr | MappingIntStrAny | None = None, exclude: AbstractSetIntStr | MappingIntStrAny | None = None, by_alias: bool = False, skip_defaults: bool | None = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False, encoder: Callable[[Any], Any] | None = None, models_as_dict: bool = True, **dumps_kwargs: Any) str#

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

set_value(value: ValueType | None) HWMType#

Replaces current HWM value with the passed one, and return HWM.

Note

Changes HWM value in place instead of returning new one

Returns:
resultHWM

Self

Examples

from etl_entities.hwm import ColumnIntHWM

hwm = ColumnIntHWM(value=1, ...)

hwm.set_value(2)
assert hwm.value == 2
update(new_data: dict) KeyValueHWMType#

Updates the HWM value based on provided new key-value data. This method only updates the value if the new value is greater than the current valur for a given key or if the key does not exist in the current value.

Note

Changes the HWM value in place and returns the modified instance.

Parameters:
new_datadict

A dictionary representing new key-value data. For example: keys are partitions and values are offsets.

Returns:
selfKeyValueHWM

The instance with updated HWM value.

Examples

from frozendict import frozendict
from etl_entities.hwm import KeyValueHWM

hwm = KeyValueHWM(value={0: 100, 1: 120}, ...)

hwm.update({1: 125, 2: 130})
assert hwm.value == frozendict({0: 100, 1: 125, 2: 130})

# The offset for partition 1 is not updated as 123 is less than 125
hwm.update({1: 123})
assert hwm.value == frozendict({0: 100, 1: 125, 2: 130})