are there retries?
# flyte-deployment
k
are there retries?
this sounds like a bug?
j
I think so yes, there’s no retries
k
Hmm cc @Haytham Abuelfutuh, it could be that the batch job vanished and Flyte cannot find it
But we should fail fast
Cc @Dan Rammer (hamersaw)
h
the plugin reported the task as failed though… I think that could be an issue in propeller <-> aws plugin layer…
d
So I know Kevin was going to look into this (via here) as he knows a bit more about the aws batch plugin than I. @Jacob Wang were you able to file an issue? I can't seem to find it. Please let me know and we will make this a priority!
j
https://github.com/flyteorg/flyte/issues/2979 @Kevin Su @Dan Rammer (hamersaw) Let me know if you want me to use the simple code (maybe tutorial code) to reproduce the issue (manually raise an exception in the tutorial code and run it via aws batch job would be enough I think)
k
@Jacob Wang I just created a pr to fix it. Please take a look when you get some times.
j
@Kevin Su Hey, that was quick! Thanks for the PR. I just took a look and it looks good to me. But it will be good that someone from flyte take a look since I am not familiar with either flyte codebase or GO. One question: this will fix the bug as long as the plugin reports error, regardless of the aws batch job status, right? Since there’s also a scenario that AWS batch job status SUCCEEDED, while plugin reports error back and task status keep running.
k
yeah, we’ll get it merge shortly.
Since there’s also a scenario that AWS batch job status SUCCEEDED, while plugin reports error back and task status keep running.
Could you share some code snippet that I can reproduce? I didn’t run into this issue before.
j
@Kevin Su Yes, here I am just mocking an exception by manually raise one in the `generate_normal_df`:
Copy code
import typing
import pandas as pd
import numpy as np

from flytekitplugins.awsbatch import AWSBatchConfig
from flytekit import task

config = AWSBatchConfig(
    platformCapabilities="EC2",
)

@task(task_config=config)
def generate_normal_df(n:int, mean: float, sigma: float) -> pd.DataFrame:
    raise Exception
    return pd.DataFrame({"numbers": np.random.normal(mean, sigma,size=n)})

@task
def compute_stats(df: pd.DataFrame) -> typing.Tuple[float, float]:
    return float(df["numbers"].mean()), float(df["numbers"].std())

@workflow
def wf(n: int = 200, mean: float = 0.0, sigma: float = 1.0) -> typing.Tuple[float, float]:
    return compute_stats(df=generate_normal_df(n=n, mean=mean, sigma=sigma))
The batch job will be in Succeeded state while plugin is reporting this back to console
And I guess reason the batch job succeeded is that flyte catches this exception and handled it so that it is not throwed out in the batch job…
k
Which version of flytekit you are using? The task (generate_normal_df) in my aws console is in the failed state.
is there any error or log message in the aws batch job that succeed
j
1.2.0
message has been deleted
Copy code
2022-10-20T11:57:41.508Z
{"asctime": "2022-10-20 11:57:41,507", "name": "flytekit.entrypoint", "levelname": "ERROR", "message": "!! Begin User Error Captured by Flyte !!"}

@ingestionTime
1666267062341
@log
601097174345:/aws/batch/job
@logStream
base_image/default/76b45eaac132413883c7cb26bfd95fad
@message
{"asctime": "2022-10-20 11:57:41,507", "name": "flytekit.entrypoint", "levelname": "ERROR", "message": "!! Begin User Error Captured by Flyte !!"}
@timestamp
1666267061508
asctime
2022-10-20 11:57:41,507
levelname
ERROR
message
!! Begin User Error Captured by Flyte !!
name
flytekit.entrypoint
Copy code
2022-10-20T11:57:41.508Z
{"asctime": "2022-10-20 11:57:41,508", "name": "flytekit.entrypoint", "levelname": "ERROR", "message": "Traceback (most recent call last):\n\n File \"/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\", line 203, in user_entry_point\n return wrapped(*args, **kwargs)\n File \"/opt/program/src/main.py\", line 126, in generate_normal_df\n raise Exception\n\nMessage:\n\n \n\nUser error."}
@ingestionTime
1666267062341
@log
601097174345:/aws/batch/job
@logStream
base_image/default/76b45eaac132413883c7cb26bfd95fad
@message
{"asctime": "2022-10-20 11:57:41,508", "name": "flytekit.entrypoint", "levelname": "ERROR", "message": "Traceback (most recent call last):\n\n File \"/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\", line 203, in user_entry_point\n return wrapped(*args, **kwargs)\n File \"/opt/program/src/main.py\", line 126, in generate_normal_df\n raise Exception\n\nMessage:\n\n \n\nUser error."}
@timestamp
1666267061508
asctime
2022-10-20 11:57:41,508
levelname
ERROR
message
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 203, in user_entry_point
return wrapped(*args, **kwargs)
File "/opt/program/src/main.py", line 126, in generate_normal_df
raise Exception
Message:
User error.
name
flytekit.entrypoint
Copy code
2022-10-20T11:57:41.508Z
{"asctime": "2022-10-20 11:57:41,508", "name": "flytekit.entrypoint", "levelname": "ERROR", "message": "!! End Error Captured by Flyte !!"}
@ingestionTime
1666267062341
@log
601097174345:/aws/batch/job
@logStream
base_image/default/76b45eaac132413883c7cb26bfd95fad
@message
{"asctime": "2022-10-20 11:57:41,508", "name": "flytekit.entrypoint", "levelname": "ERROR", "message": "!! End Error Captured by Flyte !!"}
@timestamp
1666267061508
asctime
2022-10-20 11:57:41,508
levelname
ERROR
message
!! End Error Captured by Flyte !!
name
flytekit.entrypoint
Copy code
2022-10-20T11:57:41.507Z
{"asctime": "2022-10-20 11:57:41,507", "name": "flytekit", "levelname": "ERROR", "message": "Exception when executing ", "exc_info": "Traceback (most recent call last):\n File \"/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\", line 203, in user_entry_point\n return wrapped(*args, **kwargs)\n File \"/opt/program/src/main.py\", line 126, in generate_normal_df\n raise Exception\nException\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File \"/usr/local/lib/python3.8/site-packages/flytekit/core/base_task.py\", line 479, in dispatch_execute\n native_outputs = self.execute(**native_inputs)\n File \"/usr/local/lib/python3.8/site-packages/flytekit/core/python_function_task.py\", line 163, in execute\n return exception_scopes.user_entry_point(self._task_function)(**kwargs)\n File \"/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\", line 214, in user_entry_point\n raise FlyteScopedUserException(*_exc_info())\nflytekit.exceptions.scopes.FlyteScopedUserException"}
@ingestionTime
1666267062341
@log
601097174345:/aws/batch/job
@logStream
base_image/default/76b45eaac132413883c7cb26bfd95fad
@message
{"asctime": "2022-10-20 11:57:41,507", "name": "flytekit", "levelname": "ERROR", "message": "Exception when executing ", "exc_info": "Traceback (most recent call last):\n File \"/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\", line 203, in user_entry_point\n return wrapped(*args, **kwargs)\n File \"/opt/program/src/main.py\", line 126, in generate_normal_df\n raise Exception\nException\n\nDuring handling of the above exception, another exception occurred:\n\nTraceback (most recent call last):\n File \"/usr/local/lib/python3.8/site-packages/flytekit/core/base_task.py\", line 479, in dispatch_execute\n native_outputs = self.execute(**native_inputs)\n File \"/usr/local/lib/python3.8/site-packages/flytekit/core/python_function_task.py\", line 163, in execute\n return exception_scopes.user_entry_point(self._task_function)(**kwargs)\n File \"/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py\", line 214, in user_entry_point\n raise FlyteScopedUserException(*_exc_info())\nflytekit.exceptions.scopes.FlyteScopedUserException"}
@timestamp
1666267061507
asctime
2022-10-20 11:57:41,507
exc_info
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 203, in user_entry_point
return wrapped(*args, **kwargs)
File "/opt/program/src/main.py", line 126, in generate_normal_df
raise Exception
Exception
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.8/site-packages/flytekit/core/base_task.py", line 479, in dispatch_execute
native_outputs = self.execute(**native_inputs)
File "/usr/local/lib/python3.8/site-packages/flytekit/core/python_function_task.py", line 163, in execute
return exception_scopes.user_entry_point(self._task_function)(**kwargs)
File "/usr/local/lib/python3.8/site-packages/flytekit/exceptions/scopes.py", line 214, in user_entry_point
raise FlyteScopedUserException(*_exc_info())
flytekit.exceptions.scopes.FlyteScopedUserException
levelname
ERROR
message
Exception when executing
name
flytekit
@Kevin Su
k
@Jacob Wang I see. basically, flytekit capture the exception and write a error.pb to s3. error.pb contains error code and message, and propeller will read this file and retry the task base on the error code. therefore, the status on aws console is succeed, and the status on flyte console is failed. it won’t run forever again after merge this pr.
j
@Kevin Su Hey Kevin, is this going to be included in v1.3.0? It is not merged yet
d
@Jacob Wang, thanks for patience on this. I owe a review on it, will make a priority today. It will certainly be included in 1.3!
j
Ok. good to know! Thanks:)
@Kevin Su @Dan Rammer (hamersaw) Hi, hope you had a lovely Christmas! I saw that the PR is merged, so I updated the flytekit and flytekitplugin both to 1.3.0-b6, and the flyte helm chart to 1.3.0-b5, but the problem still remains. Then I guess there should be some work to do on the flyte console/properler side?
Or is it included in the any release yet?
d
@Jacob Wang thanks for your patience on this, as you probably suspect different parts of the team have been off during the holidays. This PR should have been included in the 1.3.0-b5 release, so it is troublesome that this did not fix your issue. Will take a look into this and update within a day.
k
@Jacob Wang is the status of task still “running”?
j
Yes exactly, I tried to raise an exception in a dummy workflow and batch job failed, but the task is still running
k
The task keeps running might because aws batch keeps trying to rerun the sub-task. Is the subtask still running on aws batch? https://github.com/flyteorg/flyteplugins/blob/d0a6ee24404c73b2db6fe35dd59fd6a9e285f755/go/tasks/plugins/array/awsbatch/config/config.go#L19-L20
j
The task on aws is not running anymore and the status of it is succeeded(flyte catch the error)
It won’t retry since it is in the succeeded state, I suspect that’s the issue: flytekit catch the exception and batch job succeeded.
I cannot find the batch job anymore but the state of it is “SUCCEEDED”
k
sorry, could you send me the example again? I try to reproduce it on my cluster
j
It’s the flyte sample code:
Copy code
import typing
import pandas as pd
import numpy as np

from flytekit import task, workflow, Resources
from flytekitplugins.awsbatch import AWSBatchConfig


config = AWSBatchConfig(
    platformCapabilities="EC2",
)

# task executed in flyte k8s cluster
@task(requests=Resources(cpu="100m", mem="200Mi"))
def generate_normal_df(n: int, mean: float, sigma: float) -> pd.DataFrame:
    return pd.DataFrame({"numbers": np.random.normal(mean, sigma, size=n)})


# task executed in AWS batch job
@task(requests=Resources(mem="2Gi", cpu="1"), task_config=config)
def compute_stats(df: pd.DataFrame) -> typing.Tuple[float, float]:
    <http://logger.info|logger.info>(f"Running task compute_stats on {timestamp_day()}")
    raise ValueError("Intended Error Test")
    return float(df["numbers"].mean()), float(df["numbers"].std())


@workflow
def wf(n: int = 200, mean: float = 0.0, sigma: float = 1.0) -> typing.Tuple[float, float]:
    return compute_stats(df=generate_normal_df(n=n, mean=mean, sigma=sigma))
@Kevin Su Hi, I think the propeller pod was recreated when I bump the version to 1.3.0b5
Regarding the screenshot, isn’t it failed because of the first task which is a flyte python task?
The second task is the AWS Map Task.
k
oh, sorry. I didn’t notice that. Let me rerun it again
j
Thanks!
k
I created a pr to fix it. yes, you’re right. flytekit catch the error. As the result, aws batch didn’t know the task failed, and thought the job had succeeded.
j
Thanks a lot @Kevin Su and @Dan Rammer (hamersaw)! Hope this can be released soon:)
k
Awesome
@Jacob Wang please spread the word
j
@Dan Rammer (hamersaw) Hi Dan, I saw that you’ve approved the PR. Would this be released soon?
k
we will merge it by today
j
Hey, @Kevin Su, is this going to be included in the next version of flyte? I am trying to understand where flyte specify the plugin version and how can I know the a version of plugin is included?
k
yes, we’re going to release 1,4 beta in these two days.
j
I see, many thanks!
@Kevin Su I updated to 1.4.0-b0, it seems not fixed…
image.png
Again, the batch job succeeded but in the console it is still running
Maybe we can have a call when you have time and look it through if you up to it?
k
I can jump on a call after 3:50pm
probably a bit late for your. we can talk tomorrow as well.
j
Sure, I am in CET time so I can wait you till you get up now:)
k
@Jacob Wang are your around?
j
It is working now! @Kevin Su I didn’t update flytekit/flytekit-plugin python package last time
But I am wondering why this is related? I thought flytekit/plugin is just the SDK to interact.
Is the best practice to always keep the flytekit and flyte platform the same version?
k
No flytekit should not need to be updated. That is one of the goals. Unless you want to use a newer feature in flytekit
That being said newer flytekit may not work with a very old platform
156 Views