Hey all, Currently seeing a flyte workflow issue, ...
# ask-the-community
e
Hey all, Currently seeing a flyte workflow issue, specifically:
Copy code
Failed to convert return value for var o0 for function src.tasks.batch_transform.load_bt_result with error <class 'TypeError'>: unhashable type: 'list'. 

SYSTEM ERROR! Contact platform administrators.
We've double checked that this task is actually returning a pandas df. We've bumped the cache version as well. The flyte propeller logs show:
Copy code
{"json":{"exec_id":"f6d3b8ca69b2149bbbbb","node":"n3/n3","ns":"forecaster-training-dev","res_ver":"38555329","routine":"worker-19","tasktype":"python-task","wf":"forecaster-training:dev:src.workflow.end_to_end"},"level":"warning","msg":"No plugin found for Handler-type [python-task], defaulting to [container]","ts":"2022-10-26T01:19:57Z"}
{"json":{"exec_id":"f6d3b8ca69b2149bbbbb","node":"n4/n3","ns":"forecaster-training-dev","res_ver":"38555329","routine":"worker-19","wf":"forecaster-training:dev:src.workflow.end_to_end"},"level":"warning","msg":"No plugin found for Handler-type [python-task], defaulting to [container]","ts":"2022-10-26T01:19:57Z"}
{"json":{"exec_id":"f6d3b8ca69b2149bbbbb","node":"n4/n3","ns":"forecaster-training-dev","res_ver":"38555329","routine":"worker-19","wf":"forecaster-training:dev:src.workflow.end_to_end"},"level":"warning","msg":"Failed to record taskEvent, error [EventAlreadyInTerminalStateError: conflicting events; destination: ABORTED, caused by [rpc error: code = FailedPrecondition desc = invalid phase change from FAILED to ABORTED for task execution {resource_type:TASK project:\"forecaster-training\" domain:\"dev\" name:\"src.tasks.batch_transform.load_bt_result\" version:\"KZ5UyzZ2adcfdjkL4edUwg==\"  node_id:\"n4-0-n3\" execution_id:\u003cproject:\"forecaster-training\" domain:\"dev\" name:\"f6d3b8ca69b2149bbbbb\" \u003e  0 {} [] 0}]]. Trying to record state: ABORTED. Ignoring this error!","ts":"2022-10-26T01:19:57Z"}
k
This is odd That the pandas dataframe is getting read as list
Task signature will help
e
Copy code
@task(cache=True, cache_version="1.7")
def load_bt_result(result: FlyteDirectory) -> CachedDataFrame:


CachedDataFrame = Annotated[
    pd.DataFrame, HashMethod(hash_pandas_dataframe_function)  # noqa: F821
]

def hash_pandas_dataframe_function(df: pd.DataFrame) -> str:
k
hmm interesting
is this because of custom hashing maybe? cc @Eduardo Apolinario (eapolinario) / @Kevin Su?
e
@Edgar Trujillo, can you double-check which versions of python, flytekit, flytepropeller, and flyteadmin you're running? Also, I noticed that the cache version in the task you shared is 1.7, does this mean that this used to work and broke recently?
e
Yes, was working previously but it was a simple fix... a recent change to the pandas df hashing function wasnt included in this repo causing the issue. thanks @Ketan (kumare3) & @Eduardo Apolinario (eapolinario) for the help
k
Ohh but that error was odd
164 Views