• JP Kosymna

    JP Kosymna

    6 months ago
    I'm getting some unexpected results using the map_task and want to make sure I'm using it correctly
    map_task(move_and_split_txns, concurrency=1)(
        move_and_split_txns_config_file=config_files
    ).with_overrides(retries=5)
    config_files
    is a list with 3 FlyteFiles returned from a dependent task
    move_and_split_txns
    is a task that reads the FlyteFile and does the heavy lifting The nodes I was using didn't have enough memory and the
    move_and_split_txns
    tasks all got oomKilled by kubernetes. The
    map_task
    says it completed successfully with no reties I would expect the map_task to at least fail. What am I doing wrong?
  • Ketan (kumare3)

    Ketan (kumare3)

    6 months ago
    cc @Dan Rammer (hamersaw) / @Haytham Abuelfutuh
  • Dan Rammer (hamersaw)

    Dan Rammer (hamersaw)

    6 months ago
    I started taking a quick look into it already. I suspect that I will have to run a few tests. It looks like we're correctly handling the OOMKilled and reporting the subtask executions as a retryable failure, but still marking the overall task as a success (for some reason?).
  • Ketan (kumare3)

    Ketan (kumare3)

    6 months ago
    this seems odd
  • Dan Rammer (hamersaw)

    Dan Rammer (hamersaw)

    6 months ago
    i agree. @JP Kosymna can you send the task metadata (in text) from the right-most tab (ie. attached image)
  • JP Kosymna

    JP Kosymna

    6 months ago
    {
      "config": {},
      "id": {
        "resourceType": 1,
        "project": "flytesnacks",
        "domain": "development",
        "name": "flyte_workflows.xxxxx.xxxxx_bulk_load.mapper_move_and_split_txns_3",
        "version": "v41"
      },
      "type": "container_array",
      "metadata": {
        "discoverable": true,
        "runtime": {
          "type": 1,
          "version": "0.26.1",
          "flavor": "python"
        },
        "retries": {},
        "discoveryVersion": "2"
      },
      "interface": {
        "inputs": {
          "variables": {
            "move_and_split_txns_config_file": {
              "type": {
                "collectionType": {
                  "blob": {}
                }
              },
              "description": "move_and_split_txns_config_file"
            }
          }
        },
        "outputs": {
          "variables": {
            "o0": {
              "type": {
                "collectionType": {
                  "simple": 1
                }
              },
              "description": "o0"
            }
          }
        }
      },
      "custom": {
        "fields": {
          "parallelism": {
            "stringValue": "1"
          }
        }
      },
      "taskTypeVersion": 1,
      "container": {
        "command": [],
        "args": [
          "pyflyte-map-execute",
          "--inputs",
          "{{.input}}",
          "--output-prefix",
          "{{.outputPrefix}}",
          "--raw-output-data-prefix",
          "{{.rawOutputDataPrefix}}",
          "--resolver",
          "flytekit.core.python_auto_container.default_task_resolver",
          "--",
          "task-module",
          "flyte_workflows.xxxxx.xxxxx_bulk_load",
          "task-name",
          "move_and_split_txns"
        ],
        "env": [
          {
            "key": "FLYTE_INTERNAL_IMAGE",
            "value": "<http://518673686532.dkr.ecr.us-west-2.amazonaws.com/flyte/test:v41|518673686532.dkr.ecr.us-west-2.amazonaws.com/flyte/test:v41>"
          }
        ],
        "config": [],
        "ports": [],
        "image": "<http://518673686532.dkr.ecr.us-west-2.amazonaws.com/flyte/test:v41|518673686532.dkr.ecr.us-west-2.amazonaws.com/flyte/test:v41>",
        "resources": {
          "requests": [],
          "limits": []
        }
      }
    }
  • Dan Rammer (hamersaw)

    Dan Rammer (hamersaw)

    6 months ago
    OK, think I figured out the problem - but not the solution yet. I believe that defining "concurrency = 1" within the map task writes a custom taskTemplate, in the example above (cc @Yee / @Eduardo Apolinario (eapolinario) can you confirm?):
    "custom": {
          "fields": {
            "parallelism": {
              "stringValue": "1"
            }
          }
        },
    flyteplugins attempts to read the array job taskTemplate custom from the raw protobuf. If it doesn't exist we use default values (ex. 1.0 for min_success_ratio). If it does exist, we read it. Since the custom exists in this case, we use it. However, the min_success_ratio is not defined and by protobuf standards defaults to 0.0. Therefore, when we set the min successes for the map task it uses 0.0 ration. the map task executes and waits for everything to complete and succeeds with 0 minSuccesses .
  • we should probably file an issue for this to track it.
  • JP Kosymna

    JP Kosymna

    6 months ago
    Thanks for looking into this. As a work around can I set min_success_ratio = 1 when I set a concurrency?
  • Dan Rammer (hamersaw)

    Dan Rammer (hamersaw)

    6 months ago
    I'm going to defer fora reply from Yee or Eduardo. There should be an easy workaround (probably what you suggested), but I don't want to tell you it will work as I'm not 100%.
  • @JP Kosymna thanks for finding this too! I'm sure others have ran into it or certainly will.
  • Ketan (kumare3)

    Ketan (kumare3)

    6 months ago
    uh-ho, min success ratio
  • to explain this @JP Kosymna this is done so that, you can have 90% completion and still move on
  • @Dan Rammer (hamersaw) I think we should now allow 0.0 as a allowed ratio. Maybe
    0.0
    means
    1.0
    ?
  • Eduardo Apolinario (eapolinario)

    Eduardo Apolinario (eapolinario)

    6 months ago
    Thanks for looking into this. As a work around can I set min_success_ratio = 1 when I set a concurrency? yes, this should unblock you.
    IMO we should default to
    1.0
    in flytekit and think about the
    0.0
    case separately. This won't solve the problem for other languages obviously, but that's a separate problem.
  • Dan Rammer (hamersaw)

    Dan Rammer (hamersaw)

    6 months ago
    i like setting
    1.0
    as the flytekit default. i can't think of a scenario where somebody would want to set the min_success_ratio to
    0.0
    , but in case it would still work as expected.
  • Eduardo Apolinario (eapolinario)

    Eduardo Apolinario (eapolinario)

    6 months ago
    @JP Kosymna, I misspoke earlier. We have a bug in the current implementation of array tasks with custom fields, so for now, to unblock you, please remove
    concurrency
    from the invocation of
    map_task
    . Parallelism is not implemented in the back-end yet (it's a feature in the next release). We'll be fixing this bug in the coming flyte release.
  • JP Kosymna

    JP Kosymna

    6 months ago
    @Eduardo Apolinario (eapolinario) is there any workaround? I have a map_task with 10,000 tasks and would like to run 300 in parallel. I guess I could set resource requests and do the math to size a node_group to allow for a maximum of 300 pods but is there anything easier?
  • Ketan (kumare3)

    Ketan (kumare3)

    6 months ago
    @JP Kosymna so the system uses resource manager in the back. So it will not overwhelm the system. It will maintain a max number of global map tasks
  • Cc @Haytham Abuelfutuh can you share the config
  • You can ofcourse change that
  • JP Kosymna

    JP Kosymna

    6 months ago
    Thanks ketan
  • Haytham Abuelfutuh

    Haytham Abuelfutuh

    6 months ago
    Yes, you need to set a config like this on propeller:
    plugins:
      k8s-array:
        resourceConfig:
          primaryLabel: k8sArray
          limit: 300
        maxArrayJobSize: 10000
    This means there can be at most 300 subtasks running the system and any array task can have at most 10K subtasks.
  • Ketan (kumare3)

    Ketan (kumare3)

    6 months ago
    @JP Kosymna ^