I'm getting some unexpected results using the map_...
# flytekit
j
I'm getting some unexpected results using the map_task and want to make sure I'm using it correctly
Copy code
map_task(move_and_split_txns, concurrency=1)(
    move_and_split_txns_config_file=config_files
).with_overrides(retries=5)
config_files
is a list with 3 FlyteFiles returned from a dependent task
move_and_split_txns
is a task that reads the FlyteFile and does the heavy lifting The nodes I was using didn't have enough memory and the
move_and_split_txns
tasks all got oomKilled by kubernetes. The
map_task
says it completed successfully with no reties I would expect the map_task to at least fail. What am I doing wrong?
k
cc @Dan Rammer (hamersaw) / @Haytham Abuelfutuh
d
I started taking a quick look into it already. I suspect that I will have to run a few tests. It looks like we're correctly handling the OOMKilled and reporting the subtask executions as a retryable failure, but still marking the overall task as a success (for some reason?).
k
this seems odd
d
i agree. @JP Kosymna can you send the task metadata (in text) from the right-most tab (ie. attached image)
j
Copy code
{
  "config": {},
  "id": {
    "resourceType": 1,
    "project": "flytesnacks",
    "domain": "development",
    "name": "flyte_workflows.xxxxx.xxxxx_bulk_load.mapper_move_and_split_txns_3",
    "version": "v41"
  },
  "type": "container_array",
  "metadata": {
    "discoverable": true,
    "runtime": {
      "type": 1,
      "version": "0.26.1",
      "flavor": "python"
    },
    "retries": {},
    "discoveryVersion": "2"
  },
  "interface": {
    "inputs": {
      "variables": {
        "move_and_split_txns_config_file": {
          "type": {
            "collectionType": {
              "blob": {}
            }
          },
          "description": "move_and_split_txns_config_file"
        }
      }
    },
    "outputs": {
      "variables": {
        "o0": {
          "type": {
            "collectionType": {
              "simple": 1
            }
          },
          "description": "o0"
        }
      }
    }
  },
  "custom": {
    "fields": {
      "parallelism": {
        "stringValue": "1"
      }
    }
  },
  "taskTypeVersion": 1,
  "container": {
    "command": [],
    "args": [
      "pyflyte-map-execute",
      "--inputs",
      "{{.input}}",
      "--output-prefix",
      "{{.outputPrefix}}",
      "--raw-output-data-prefix",
      "{{.rawOutputDataPrefix}}",
      "--resolver",
      "flytekit.core.python_auto_container.default_task_resolver",
      "--",
      "task-module",
      "flyte_workflows.xxxxx.xxxxx_bulk_load",
      "task-name",
      "move_and_split_txns"
    ],
    "env": [
      {
        "key": "FLYTE_INTERNAL_IMAGE",
        "value": "<http://518673686532.dkr.ecr.us-west-2.amazonaws.com/flyte/test:v41|518673686532.dkr.ecr.us-west-2.amazonaws.com/flyte/test:v41>"
      }
    ],
    "config": [],
    "ports": [],
    "image": "<http://518673686532.dkr.ecr.us-west-2.amazonaws.com/flyte/test:v41|518673686532.dkr.ecr.us-west-2.amazonaws.com/flyte/test:v41>",
    "resources": {
      "requests": [],
      "limits": []
    }
  }
}
d
OK, think I figured out the problem - but not the solution yet. I believe that defining "concurrency = 1" within the map task writes a custom taskTemplate, in the example above (cc @Yee / @Eduardo Apolinario (eapolinario) can you confirm?):
Copy code
"custom": {
      "fields": {
        "parallelism": {
          "stringValue": "1"
        }
      }
    },
flyteplugins attempts to read the array job taskTemplate custom from the raw protobuf. If it doesn't exist we use default values (ex. 1.0 for min_success_ratio). If it does exist, we read it. Since the custom exists in this case, we use it. However, the min_success_ratio is not defined and by protobuf standards defaults to 0.0. Therefore, when we set the min successes for the map task it uses 0.0 ration. the map task executes and waits for everything to complete and succeeds with 0 minSuccesses .
we should probably file an issue for this to track it.
j
Thanks for looking into this. As a work around can I set min_success_ratio = 1 when I set a concurrency?
d
I'm going to defer fora reply from Yee or Eduardo. There should be an easy workaround (probably what you suggested), but I don't want to tell you it will work as I'm not 100%.
@JP Kosymna thanks for finding this too! I'm sure others have ran into it or certainly will.
k
uh-ho, min success ratio
to explain this @JP Kosymna this is done so that, you can have 90% completion and still move on
@Dan Rammer (hamersaw) I think we should now allow 0.0 as a allowed ratio. Maybe
0.0
means
1.0
?
e
Thanks for looking into this. As a work around can I set min_success_ratio = 1 when I set a concurrency?
yes, this should unblock you. IMO we should default to
1.0
in flytekit and think about the
0.0
case separately. This won't solve the problem for other languages obviously, but that's a separate problem.
d
i like setting
1.0
as the flytekit default. i can't think of a scenario where somebody would want to set the min_success_ratio to
0.0
, but in case it would still work as expected.
e
@JP Kosymna, I misspoke earlier. We have a bug in the current implementation of array tasks with custom fields, so for now, to unblock you, please remove
concurrency
from the invocation of
map_task
. Parallelism is not implemented in the back-end yet (it's a feature in the next release). We'll be fixing this bug in the coming flyte release.
j
@Eduardo Apolinario (eapolinario) is there any workaround? I have a map_task with 10,000 tasks and would like to run 300 in parallel. I guess I could set resource requests and do the math to size a node_group to allow for a maximum of 300 pods but is there anything easier?
k
@JP Kosymna so the system uses resource manager in the back. So it will not overwhelm the system. It will maintain a max number of global map tasks
Cc @Haytham Abuelfutuh can you share the config
You can ofcourse change that
j
Thanks ketan
h
Yes, you need to set a config like this on propeller:
Copy code
plugins:
  k8s-array:
    resourceConfig:
      primaryLabel: k8sArray
      limit: 300
    maxArrayJobSize: 10000
This means there can be at most 300 subtasks running the system and any array task can have at most 10K subtasks.
k
@JP Kosymna ^
276 Views