:wave: I’m trying to enable caching in a pretty s...
# flytekit
b
👋 I’m trying to enable caching in a pretty simple workflow:
Copy code
@task(cache=True, cache_version="1.0")
def submit_adhoc_spark_job(user: str) -> str:
  ...
But running the workflow several times, still seems to re-execute the
submit_adhoc_spark_job
node. Following the docs, maintaining the “Project, Domain, Cache Version, Task Signature, and Inputs” should be caching the output, but this doesn’t seem to be the case. Is there some other configurations I’m missing?
d
Hi @Brian Tang! Are you running the workflow locally or remotely (ex. through the UI or flytectl launch plans)?
b
remotely through launch plans
d
Great, are you tracking the execution through the UI or using
flytectl get execution ...
?
Can you provide either the flytectl command ^ outputs or a UI screenshot for task executions?
b
I’m executing the wfs through the UI: Here’s a screen of two executions of the same workflow:
and then here’s the screen of the 2nd run of the workflow
note the
n0
node is the one that I set to cache and confirmed from the logs it is indeed resubmitting the spark job
d
The 'lz4o8w2cwy' execution came first right? The start time is about 4:30 earlier. What do the nodes in the other execution look like?
b
correct
lz4o8w2cwy
came first. They are the same as far as i can tell:
d
Interesting, can you dump the task definition? If you clock on the cached task (ie. n0) and then copy the json in the task tab (ex. screenshot).
b
Copy code
{
  "config": {},
  "id": {
    "resourceType": 1,
    "project": "ml-serving",
    "domain": "development",
    "name": "src.python.flyte.ml_serving.example.main.submit_adhoc_spark_job",
    "version": "d31c33c58f2956fac010277b98aa24552cfc5b6f"
  },
  "type": "python-task",
  "metadata": {
    "discoverable": true,
    "runtime": {
      "type": 1,
      "version": "0.26.0",
      "flavor": "python"
    },
    "retries": {},
    "discoveryVersion": "1.0"
  },
  "interface": {
    "inputs": {
      "variables": {
        "user": {
          "type": {
            "simple": 3
          },
          "description": "user"
        }
      }
    },
    "outputs": {
      "variables": {
        "o0": {
          "type": {
            "simple": 3
          },
          "description": "o0"
        }
      }
    }
  },
  "container": {
    "command": [],
    "args": [
      "pyflyte-execute",
      "--inputs",
      "{{.input}}",
      "--output-prefix",
      "{{.outputPrefix}}",
      "--raw-output-data-prefix",
      "{{.rawOutputDataPrefix}}",
      "--resolver",
      "flytekit.core.python_auto_container.default_task_resolver",
      "--",
      "task-module",
      "src.python.flyte.ml_serving.example.main",
      "task-name",
      "submit_adhoc_spark_job"
    ],
    "env": [
      {
        "key": "FLYTE_INTERNAL_CONFIGURATION_PATH",
        "value": "/app/flytekit.config"
      },
      {
        "key": "FLYTE_INTERNAL_IMAGE",
        "value": "<http://030465607062.dkr.ecr.us-west-2.amazonaws.com/stripe-flyte/ml-serving/example:d31c33c58f2956fac010277b98aa24552cfc5b6f|030465607062.dkr.ecr.us-west-2.amazonaws.com/stripe-flyte/ml-serving/example:d31c33c58f2956fac010277b98aa24552cfc5b6f>"
      }
    ],
    "config": [],
    "ports": [],
    "image": "<http://030465607062.dkr.ecr.us-west-2.amazonaws.com/stripe-flyte/ml-serving/example:d31c33c58f2956fac010277b98aa24552cfc5b6f|030465607062.dkr.ecr.us-west-2.amazonaws.com/stripe-flyte/ml-serving/example:d31c33c58f2956fac010277b98aa24552cfc5b6f>",
    "resources": {
      "requests": [],
      "limits": []
    }
  }
}
d
Ok, everything looks correct here. Are you running this in the sandbox?
It looks like for some reason the datacatalog isn't active. Could be mis-configuration somewhere.
b
this is run in our prod deployment — we very well likely may have mis configured something, could you say more on what you mean about the datacatalog?
d
Absolutely, the datacatalog is a component which (among other things) stores information on cached datasets. When FlytePropeller is scheduling tasks which may be cached, it contacts the datacatalog to see if they have been previously executed and if so, attempts to reuse those cached results. If the datacatalog is not running, or FlytePropeller isn't able to contact it, then caching will not work.
Before looking at any logs I would check if datacatalog is running with something like
kubectl -n flyte get deployments datacatalog
and then check the FlytePropeller configmap for the
catalog-cache
configuration.
👍 1
It should look something like:
Copy code
catalog-cache:
      endpoint: datacatalog:89
      insecure: true
      type: datacatalog
if the
type
is set to
noop
then datacatalog is entirely disabled.
k
@Brian Tang did you guys deploy datacatalog?
b
That’s likely it. thanks
👍 1
oh and as a total aside — we’re trying to use
execution_id
for output directories, but it seems to come back as a multi-line str eg.,
print(flytekit.current_context().execution_id)
Copy code
project: "ml-serving"
domain: "development"
name: "cnqve8n83g"
is there a more standard way of getting maybe just the “name” part of the execution_id?
d
cc @Eduardo Apolinario (eapolinario) @Yee ^^^
k
@Yee can just
.name
work?
👍 1
e
yes,
.name
should work.
👍 1
b
doh — I think I was doing
.name()
as I was looking at the identifier
lemme try that
188 Views