Bernhard Stadlbauer
01/12/2023, 2:28 PMmap_task
though) - so following along here for any debugging tipsFabio Grätz
01/12/2023, 2:28 PMNiels Bantilan
01/12/2023, 2:47 PMFabio Grätz
01/12/2023, 2:50 PMdef train(
task_name: str,
param: List[int],
stage: StageConfig,
base_path: str = BASE_PATH,
warm_start_path: Optional[str] = None,
) -> str:
StageConfig
is a dataclass_json
that also has other `dataclass_json`s nested under it.Niels Bantilan
01/12/2023, 2:56 PMFabio Grätz
01/12/2023, 2:57 PMNiels Bantilan
01/12/2023, 3:36 PMKetan (kumare3)
Niels Bantilan
01/12/2023, 3:41 PMKetan (kumare3)
Eduardo Apolinario (eapolinario)
01/12/2023, 8:56 PMDan Rammer (hamersaw)
01/13/2023, 1:40 PMFabio Grätz
01/13/2023, 1:41 PMdef train_stage(…) -> (str, str):
instead of Tuple[str, str]
.Dan Rammer (hamersaw)
01/13/2023, 1:44 PMBernhard Stadlbauer
01/13/2023, 2:06 PMDan Rammer (hamersaw)
01/13/2023, 2:17 PMBernhard Stadlbauer
01/13/2023, 2:18 PMFabio Grätz
01/13/2023, 4:45 PMgenerate_report
fails to cache all 4 times. The task full_evaluation
in the end fails as well. In this example, all the tasks that fail, thus, are tasks that don’t have a node after them.
In the full >50h, ~40 tasks training, some of the train
tasks (which do have a task after them in the graph) failed to cache as well. This I cannot reproduce. However, the ML engineer who wrote the workflow didn’t run the workflow in one single execution end to end due to intermittent failures. The restarts might, thus, have had some influence as well 🤔 Since the the full execution takes so long, we haven’t run it again.
So far, I have only focused on removing logic while making sure that the cache failure persists. I haven’t yet tried to remove defaults in the tasks etc.Dan Rammer (hamersaw)
01/13/2023, 4:54 PMgenerate_report
is the function we want to cache right? I am pretty sure we check if there are task outputs, if there are none then there is nothing to cache and we skip it.Fabio Grätz
01/13/2023, 5:02 PMgenerate_report
and full_evaluation
.Dan Rammer (hamersaw)
01/13/2023, 5:06 PMFabio Grätz
01/13/2023, 5:11 PMgenerate_report
doesn’t have a return value so that one is ✅ . full_evaluation
also has no return value -> ✅ 🤦 silly me …
There are two other tasks that do have return values that also showed this behaviour. When trying to construct a minimal working example, I didn’t see this at all today though.
The screenshot with the task search
shows what I called try_params
in the “minimal working example” (renamed it to avoid internal lingo):
@task(
cache=True,
cache_version="0.1",
requests=Resources(gpu="1", cpu="11", mem="44G"),
disable_deck=False,
)
def try_params(
...
) -> str:
if _is_distributed_worker():
raise flytekit.core.base_task.IgnoreOutputs()
else:
return wandb.run.path
The if statement makes no sense since this is not a pytorch task but should never be True since it checks if RANK
is set and != 0 which is never the case for Python tasks.
For this task, the screenshot shows that two instances running in parallel one time worked, one time didn’t.
Then there is the train
task:
@task(
cache=True,
cache_version="0.1",
task_config=PyTorch(
num_workers=3,
),
requests=Resources(gpu="1", cpu="11", mem="44G"),
disable_deck=False,
)
def train(
....
) -> (str, str):
if _is_distributed_worker():
raise flytekit.core.base_task.IgnoreOutputs()
else:
return wandb.run.path, out_dir
flytekit.core.base_task.IgnoreOutputs()
?
If one worker with RANK!=0 finishes first and returns nothing, could this lead to the cache being deactivated?Dan Rammer (hamersaw)
01/13/2023, 5:50 PMFabio Grätz
01/17/2023, 1:34 PMDan Rammer (hamersaw)
01/17/2023, 1:39 PMIgnoreOutputs
stuff - propeller should fail the task if it tries to cache it and the outputs were ignored, so that can not be it. The next things was that propeller transparently fails caching. So exactly what you are seeing, the caching mechanism runs into a failure and propeller will still mark the task as succeeded but just not cached. We should really make this more observable. All of the "Failed to write result to catalog for task" indicates this is what is happening. I will dive into this.Fabio Grätz
01/17/2023, 1:42 PMIgnoreOutputs
by intentionally delaying RANK 0 (or every worker but RANK 0) to intentionally trigger a potential race condition. But the correct return value from RANK 0 was always retrieved…Dan Rammer (hamersaw)
01/17/2023, 1:49 PMFabio Grätz
01/17/2023, 1:50 PMremote.execute
. The execution id of the run was fc3e15e42c6ec4043b46-sbs-9
. Our execution names are generated similarly to what flytekit does, only that we allow a user defined prefix + uid:
uuid_len = 20
value = value + "-" + uuid.uuid4().hex[:uuid_len]
Yeah, so this is an error in the datacatalog. I know exactly where it happens - just need to figure out how to repro.If there is anything I can help with, I’m happy to search for more stuff in the logs or try to execute stuff. Just ping
Dan Rammer (hamersaw)
01/17/2023, 1:57 PMDatasetKey
instances printed off that you have blurred out (reasonably so), I don't necessarily care about the values but are the fields Project
, Domain
, Name
, and Version
? There is on UUID
correct?Fabio Grätz
01/17/2023, 1:57 PMDan Rammer (hamersaw)
01/17/2023, 1:58 PMFabio Grätz
01/17/2023, 2:02 PM{"json":{…}, "level":"error", "msg":"Failed to create dataset model: &{BaseModel:{CreatedAt:0001-01-01 00:00:00 +0000 UTC UpdatedAt:0001-01-01 00:00:00 +0000 UTC DeletedAt:<nil>} DatasetKey:{Project:object_detection Name:flyte_task-<package name>.applications.<application_name>.<dataset_name>.train_workflow.train_stage Domain:development Version:0.1-NGVJxIhX-egfCQQnT UUID:} SerializedMetadata:[10 67 10 12 116 97 115 107 45 118 101 114 115 105 111 110 18 51 108 117 107 97 115 45 102 101 97 116 45 107 105 116 116 105 95 99 98 100 49 98 102 56 48 95 50 48 50 51 45 48 49 45 49 54 95 48 54 45 53 52 45 50 51 95 100 105 114 116 121] PartitionKeys:[]} err: unexpected error type for: write tcp 10.52.1.3:58112->10.22.0.6:5432: write: broken pipe", "ts":"2023-01-17T01:28:18Z"}
<dataset name>
. I explicitly did not change the part where it says UUID:}
Dan Rammer (hamersaw)
01/17/2023, 2:04 PMFabio Grätz
01/17/2023, 2:05 PMimage: <http://cr.flyte.org/flyteorg/datacatalog-release:v1.2.1|cr.flyte.org/flyteorg/datacatalog-release:v1.2.1>
Dan Rammer (hamersaw)
01/17/2023, 2:35 PMh.db.Create
call to check if the item already exists or not. It seems the error that is returned from that call is not an "AlreadyExists" error so we identify it as something more serious.@task(cache=True, cache_version="1.0")
def hello_world(name: str) -> str:
return f"hello {name}"
@workflow
def hello_world_wf(name: str) -> str:
return hello_world(name=name)
and try calling it with different values, ie: foo
, bar
and see what the behavior is. I fear that the call to bar
will have the same result as here; namely, GORM isn't detecting the dataset "AlreadyExists" and the cache put fails and bar
is not cached.Fabio Grätz
01/17/2023, 2:48 PMOK - so in the error you sent above here this “broken pipe” is very suspect. In the doc you mentioned that this comes from dataset:36, is that here ?About the “broken pipe” which I agree is suspect: I just checked the CloudSQL machine, it is of a rather weak type since so far it didn’t receive much load. Do you think that a slow or failed response could be misinterpreted here? Whether dataset:36 is where you linked I can’t say for sure. The repo doesn’t have to 1.2.1 tag as the image that I’m running.
Dan Rammer (hamersaw)
01/17/2023, 2:59 PMFabio Grätz
01/17/2023, 3:00 PMDan Rammer (hamersaw)
01/17/2023, 3:06 PMFabio Grätz
01/17/2023, 3:07 PMDan Rammer (hamersaw)
01/17/2023, 3:18 PM{"json":{…}, "level":"error", "msg":"Failed to create dataset model: &{BaseModel:{CreatedAt:0001-01-01 00:00:00 +0000 UTC UpdatedAt:0001-01-01 00:00:00 +0000 UTC DeletedAt:<nil>} DatasetKey:{Project:object_detection Name:flyte_task-<package name>.applications.<application_name>.<dataset_name>.train_workflow.train_stage Domain:development Version:0.1-NGVJxIhX-egfCQQnT UUID:} SerializedMetadata:[10 67 10 12 116 97 115 107 45 118 101 114 115 105 111 110 18 51 108 117 107 97 115 45 102 101 97 116 45 107 105 116 116 105 95 99 98 100 49 98 102 56 48 95 50 48 50 51 45 48 49 45 49 54 95 48 54 45 53 52 45 50 51 95 100 105 114 116 121] PartitionKeys:[]} err: unexpected error type for: write tcp 10.52.1.3:58112->10.22.0.6:5432: write: broken pipe", "ts":"2023-01-17T01:28:18Z"}
other times we are receiving the error from here:
Dataset already exists key: id:<project:"sandbox" name:"flyte_task-workflow.hello_world" domain:"development" version:"1.0-MjvydOS6-zCZxwZgs" > metadata:<key_map:<key:"task-version" value:"hwuiQzzwpRF386ypXM2SYQ==" > > , err value with matching already exists (duplicate key value violates unique constraint "datasets_pkey")
These occur when propeller is initially trying to create the dataset to mark it as cached. Rather than looking up the dataset to see if it exists it attempts to create a new one and detects the "AlreadyExists" error. In the former case, the error does not show that the dataset already exists and propeller fails the cache put.Fabio Grätz
01/17/2023, 4:42 PMFailed to create dataset
. In the 9 days the current pod has been up, there have been 12 occurrences. All 12 occurrences happened in the “actual” workflow built and executed by the ML engineer (who ran this less than a handful of times) where the train_stage
and param_search
tasks that show this behaviour take at least 1h but depending on the config up to 3-5h.
I copied the engineers workflow to try to create a minimal working example. The signature of the tasks and the structure of the workflow remained the same. But the tasks took only a few seconds to complete. This workflow I ran probably dozens of times.
Since the error only happened in the original long running workflow which was executed far less often, I wonder whether there might be a connection somewhere that is kept open for a long time which might then fail due to gcp network errors 🤔
Basically: I wasn’t able to reproduce the result because my “minimal working example” didn’t take long enough.Dan Rammer (hamersaw)
01/17/2023, 4:43 PMI wasn’t able to reproduce the result because my “minimal working example” didn’t take long enough.exactly what I was thinking
Fabio Grätz
01/17/2023, 4:47 PMtime.sleep(1.5h)
instead of doing actual training to not waste GPU time.Dan Rammer (hamersaw)
01/17/2023, 6:18 PMSetConnMaxLifetime
, SetConnMaxIdleTime
but I'm not sure these will help. Lets see what your test comes back with - if we can reproduce it we can add configuration for the aforementioned options and run some more tests. Does that sound reasonable?Eduardo Apolinario (eapolinario)
01/17/2023, 6:52 PMFabio Grätz
01/17/2023, 7:14 PMLets see what your test comes back with - if we can reproduce it we can add configuration for the aforementioned options and run some more tests. Does that sound reasonable?Would propose exactly the same. If it doesn’t reproduce the error, I’ll give the postgres instance more resources too see if this resolves it 🤷
Dan Rammer (hamersaw)
01/17/2023, 8:07 PMFabio Grätz
01/18/2023, 2:35 PMtime.sleep(7200)
I was able to make one of the tasks fail the cache put. Then I re-ran the workflow, this task ran again, and again failed the cache put.
I will now use a better machine for the cloud sql instance and retry if this goes away.Ketan (kumare3)
Fabio Grätz
01/18/2023, 3:36 PMKetan (kumare3)
Fabio Grätz
01/18/2023, 3:38 PMDan Rammer (hamersaw)
01/18/2023, 4:37 PMSetConnMaxTimeout
, SetConnMaxIdle
, etc parameters on the DB connection to allow long-idle connections from the client-side
3. hack some kind of periodic "ping" service in datacatalog to ensure connections are not idle for long periods
Do you have any other ideas?Fabio Grätz
01/18/2023, 4:44 PMtime.sleep(2h)
workflow with the better db instance now, will report back.Ketan (kumare3)
Fabio Grätz
01/20/2023, 2:57 PMKetan (kumare3)
Fabio Grätz
01/21/2023, 10:47 AMDan Rammer (hamersaw)
01/21/2023, 2:28 PMFabio Grätz
01/23/2023, 5:19 PMDan Rammer (hamersaw)
01/23/2023, 5:31 PMFabio Grätz
01/24/2023, 5:26 PMKetan (kumare3)
Dan Rammer (hamersaw)
01/24/2023, 5:28 PMFabio Grätz
01/24/2023, 5:29 PMKetan (kumare3)
Dan Rammer (hamersaw)
01/24/2023, 5:32 PMPUT_FAILURE
. Would have to look through the code to understand where / if a retry makes sense here.Fabio Grätz
01/24/2023, 5:37 PMdb connection dropped
is encountered?Dan Rammer (hamersaw)
01/24/2023, 5:38 PMFabio Grätz
01/24/2023, 5:40 PMKetan (kumare3)
Fabio Grätz
01/24/2023, 5:49 PMKetan (kumare3)
Fabio Grätz
01/24/2023, 5:50 PMDan Rammer (hamersaw)
01/24/2023, 5:51 PMkatrina
Dan Rammer (hamersaw)
01/24/2023, 6:01 PMSetConnMaxLifetime
, etc configuration options we had proposed adding earlier). Regardless of whether we need it for this solution or not, we should have it enabled in datacatalog.