KeyValue Int HWM#
- class etl_entities.hwm.key_value.key_value_int_hwm.KeyValueIntHWM(*, name: str, description: str = '', topic: str | None = None, value: frozendict = None, expression: Any = None, modified_time: datetime = None)#
Integer KeyValue HWM type
- Parameters:
- name
str
HWM unique name
- value
frozendict[Any, KeyValueHWMValueType]
, default:frozendict
HWM value
- description
str
, default:""
Description of HWM
- sourceAny, default:
None
HWM source, e.g. topic name
- expressionAny, default:
None
Expression used to generate HWM value, e.g.
offset
- modified_time
datetime.datetime
, default: current datetime HWM value modification time
- name
Examples
from etl_entities.hwm import KeyValueIntHWM hwm_kv_int = KeyValueIntHWM( name="long_unique_name", source="topic_name", expression="offset", value={ 0: 100, # 0 and 1 - partition numbers 1: 123, # 100 and 123 - offset values }, )
- copy(*, include: AbstractSetIntStr | MappingIntStrAny | None = None, exclude: AbstractSetIntStr | MappingIntStrAny | None = None, update: DictStrAny | None = None, deep: bool = False) Model #
Duplicate a model, optionally choose which fields to include, exclude and change.
- Parameters:
include – fields to include in new model
exclude – fields to exclude from new model, as with values this takes precedence over include
update – values to change/add in the new model. Note: the data is not validated before creating the new model: you should trust this data
deep – set to True to make a deep copy of the model
- Returns:
new model instance
- dict(*, include: AbstractSetIntStr | MappingIntStrAny | None = None, exclude: AbstractSetIntStr | MappingIntStrAny | None = None, by_alias: bool = False, skip_defaults: bool | None = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False) DictStrAny #
Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.
- json(*, include: AbstractSetIntStr | MappingIntStrAny | None = None, exclude: AbstractSetIntStr | MappingIntStrAny | None = None, by_alias: bool = False, skip_defaults: bool | None = None, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False, encoder: Callable[[Any], Any] | None = None, models_as_dict: bool = True, **dumps_kwargs: Any) str #
Generate a JSON representation of the model, include and exclude arguments as per dict().
encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().
- set_value(value: ValueType | None) HWMType #
Replaces current HWM value with the passed one, and return HWM.
Note
Changes HWM value in place instead of returning new one
- Returns:
- resultHWM
Self
Examples
from etl_entities.hwm import ColumnIntHWM hwm = ColumnIntHWM(value=1, ...) hwm.set_value(2) assert hwm.value == 2
- update(new_data: dict) KeyValueHWMType #
Updates the HWM value based on provided new key-value data. This method only updates the value if the new value is greater than the current valur for a given key or if the key does not exist in the current value.
Note
Changes the HWM value in place and returns the modified instance.
- Parameters:
- new_datadict
A dictionary representing new key-value data. For example: keys are partitions and values are offsets.
- Returns:
- selfKeyValueHWM
The instance with updated HWM value.
Examples
from frozendict import frozendict from etl_entities.hwm import KeyValueHWM hwm = KeyValueHWM(value={0: 100, 1: 120}, ...) hwm.update({1: 125, 2: 130}) assert hwm.value == frozendict({0: 100, 1: 125, 2: 130}) # The offset for partition 1 is not updated as 123 is less than 125 hwm.update({1: 123}) assert hwm.value == frozendict({0: 100, 1: 125, 2: 130})