Hi All, I tried to upgrade my Google GKE flyte dep...
# flyte-deployment
n
Hi All, I tried to upgrade my Google GKE flyte deployment from v0.19.2 to v0.19.3 using Helm and encountered two issues… Issue One: Flytepropeller’s deployment now has
PriorityClass: system-cluster-critical
attribute which requires an associated k8s
quota
that was not generated by my helm config. Manually creating one allowed Flytepropeller pods to start. Was this documented anywhere? What do I need to add to my helm values file to generate this quota automatically?
y
I also faced same issue yesterday with GCP , AFAIK It is a recent change in helm chart, It’s not documented ? cc: @Haytham Abuelfutuh
k
@Yuvraj did you file an issue or have a fix for this?
I do not think we can tinker with helm deployments
y
i just started a conversation with haytham
@Nicholas LoFaso Can you create these resource ?
Copy code
apiVersion: <http://scheduling.k8s.io/v1|scheduling.k8s.io/v1>
description: Used for system critical pods that must run in the cluster, but can be
  moved to another node if necessary.
kind: PriorityClass
metadata:
  name: system-cluster-critical
preemptionPolicy: PreemptLowerPriority
value: 2000000000
---
apiVersion: <http://scheduling.k8s.io/v1|scheduling.k8s.io/v1>
description: Used for system critical pods that must not be moved from their current
  node.
kind: PriorityClass
metadata:
  name: system-node-critical
preemptionPolicy: PreemptLowerPriority
value: 2000001000
h
@Yuvraj @Nicholas LoFaso let’s have a call? https://meet.google.com/azc-xezg-gaq
n
I can join in 10 minutes
h
Per docs here: https://kubernetes.io/docs/concepts/scheduling-eviction/pod-priority-preemption/ Since K8s 1.14, these two priority classes should be included in the deployment
Note: Kubernetes already ships with two PriorityClasses:
system-cluster-critical
and
system-node-critical
. These are common classes and are used to ensure that critical components are always scheduled first.
I’m trying to figure out if GKE explicitly removes these…
n
I think the priority class exists. The error was when it tried to schedule the pod it needed to have an explicit quota set for the priority class
So maybe EKS creates an associated quota and GKE doesn’t?
h
do you have any resourcequota created in the flyte namespace?
n
I manually created one called sys-cluster-critical-quota
Copy code
apiVersion: v1
kind: ResourceQuota
metadata:
  annotations:
    <http://kubectl.kubernetes.io/last-applied-configuration|kubectl.kubernetes.io/last-applied-configuration>: |
      {"apiVersion":"v1","kind":"ResourceQuota","metadata":{"annotations":{},"name":"sys-cluster-critical-quota","namespace":"flyte"},"spec":{"hard":{"cpu":"1000","memory":"2000Gi","pods":"100"},"scopeSelector":{"matchExpressions":[{"operator":"In","scopeName":"PriorityClass","values":["system-cluster-critical"]}]}}}
  creationTimestamp: "2022-03-15T02:02:40Z"
  name: sys-cluster-critical-quota
  namespace: flyte
  resourceVersion: "151298561"
  uid: e0d211f6-8523-4032-8f5e-04b0a449b855
spec:
  hard:
    cpu: 1k
    memory: 2000Gi
    pods: "100"
  scopeSelector:
    matchExpressions:
    - operator: In
      scopeName: PriorityClass
      values:
      - system-cluster-critical
status:
  hard:
    cpu: 1k
    memory: 2000Gi
    pods: "100"
  used:
    cpu: 500m
    memory: 2Gi
    pods: "1"
after createing that it started the pod
h
for compute resources (CPU, Mem… etc.) if there is no resourceQuota object at all, k8s doesn’t attempt to enforce restrictions. I wonder if this is the same… if you don’t have resourceQuota it won’t complain.. otherwise it has to be stated in the resourceQuota?
so you had a resource qouta object but it didn’t have PriorityClass in there? or you didn’t have resource quota at all?
n
i had no quota in that namespace
I had quota in my project namespace
I can join call now
h
yeah per project is fine… that’s interesting.. I don’t yet understand the difference in behavior between EKS and GKE
we’re on the call whenever you get a chance…
n
Can we use my zoom link? https://edf.zoom.us/j/3906080438
@Nicholas LoFaso can you share the flytekit callstack and the code snippet?
n
@Haytham Abuelfutuh yes here is the callstack
Copy code
[3/3] currentAttempt done. Last Error: SYSTEM::Traceback (most recent call last):

      File "/usr/local/lib/python3.8/dist-packages/flytekit/common/exceptions/scopes.py", line 165, in system_entry_point
        return wrapped(*args, **kwargs)
      File "/usr/local/lib/python3.8/dist-packages/flytekit/core/base_task.py", line 464, in dispatch_execute
        native_inputs = TypeEngine.literal_map_to_kwargs(exec_ctx, input_literal_map, self.python_interface.inputs)
      File "/usr/local/lib/python3.8/dist-packages/flytekit/core/type_engine.py", line 600, in literal_map_to_kwargs
        return {k: TypeEngine.to_python_value(ctx, lm.literals[k], v) for k, v in python_types.items()}
      File "/usr/local/lib/python3.8/dist-packages/flytekit/core/type_engine.py", line 600, in <dictcomp>
        return {k: TypeEngine.to_python_value(ctx, lm.literals[k], v) for k, v in python_types.items()}
      File "/usr/local/lib/python3.8/dist-packages/flytekit/core/type_engine.py", line 576, in to_python_value
        return transformer.to_python_value(ctx, lv, expected_python_type)
      File "/usr/local/lib/python3.8/dist-packages/flytekit/core/type_engine.py", line 406, in to_python_value
        dc = cast(DataClassJsonMixin, expected_python_type).from_json(_json_format.MessageToJson(lv.scalar.generic))
      File "/usr/local/lib/python3.8/dist-packages/dataclasses_json/api.py", line 75, in from_json
        return cls.from_dict(kvs, infer_missing=infer_missing)
      File "/usr/local/lib/python3.8/dist-packages/dataclasses_json/api.py", line 82, in from_dict
        return _decode_dataclass(cls, kvs, infer_missing)
      File "/usr/local/lib/python3.8/dist-packages/dataclasses_json/core.py", line 197, in _decode_dataclass
        value = _decode_dataclass(field_type, field_value,
      File "/usr/local/lib/python3.8/dist-packages/dataclasses_json/core.py", line 208, in _decode_dataclass
        return cls(**init_kwargs)

Message:

    __call__() missing 1 required positional argument: 'value'

SYSTEM ERROR! Contact platform administrators.
workflow
Copy code
level1b_products = level1b_dynamic(
        o2_level0_files=o2_level0_files,
        o2_dark_file=o2_dark_file,
        ch4_level0_files=ch4_level0_files,
        ch4_dark_file=ch4_dark_file,
        ac_file=ac_file,
        params=level1b_params,
        config=level1b_config,
        output_uri=level1b_output_dir,
    )

    level2_products = level2_dynamic(
        level1b_products=level1b_products,
        apriori_config=level2_apriori_config,
        proxy_config=level2_proxy_config,
        proxy_params=level2_proxy_params,
        level2_output_uri=level2_output_dir,
    )
dynamic task signature
Copy code
@dynamic(container_image=container_image("LEVEL2_FLYTE_IMAGE"), cache=True, cache_version="1.5.0")
def level2_dynamic(
    level1b_products: List[Level1bTaskOutput],
    apriori_config: Level2AprioriConfiguration,
    proxy_config: Level2ProxyConfiguration,
    proxy_params: Level2ProxyParameters,
    level2_output_uri: str,
) -> List[FlyteDirectory]:
datacalss definition:
Copy code
@dataclass_json
@dataclass
class Level1bTaskOutput:
    # TODO nlofaso - it seems like a custom dataclass with FlyteFile's can't use type Union[FlyteFile, None]
    # initial testing seemed like it just returned the file string. It didn't actually get saved as flytefile
    # more testing needed to confirm
    # A solution might be to just return only the o2 and ch4 of the desired
    # resolution but then you'd lose the other product info
    o2_1x1: FlyteFile = empty_flytefile()
    o2_5x1: FlyteFile = empty_flytefile()
    o2_geoakaze: FlyteFile = empty_flytefile()
    ch4_1x1: FlyteFile = empty_flytefile()
    ch4_5x1: FlyteFile = empty_flytefile()
    ch4_geoakaze: FlyteFile = empty_flytefile()
h
Thank you @Nicholas LoFaso CC @Eduardo Apolinario (eapolinario)
To summarize, there seem to be three issues: 1. Discrepancy in behavior when specifying Priority Class for pods between EKS and GKE. We will document that behavior and opt to proactively create a ResourceQuota by default in the flyte namespace to circumvent the issue. a. Process Update: Will renew a call for GCP beta-testers in the community to ensure deployment manifests continue to work properly out of the box prior to releases… 2. Namespace_mapping was being ignored before and has been fixed recently, after the fix, user containers started failing because they are being created in a non-configured namespace… This behavior should have been documented in the changelog… 3. Issue with using List->DataClass->FlyteFile as inputs to a dynamic job. Under investigation. Thank you @Nicholas LoFaso for persisting through with us!
n
@Haytham Abuelfutuh Yes that captures it well. Thanks for your help!
I am rerunning the workflow now with flytekit 0.31.0 and Flyte v0.19.3 will have results in 2+ hours
will also look into creating a smaller test case to reproduce
y
smaller test case for the flytekit issue?
that would be great if so.
also could you tell us if you’re seeing the error when running locally? and can you copy/paste the contents of
empty_flytefile
?
also, what did you mean by
it seems like a custom dataclass with FlyteFile’s can’t use type Union[FlyteFile, None]
should we make a separate ticket for it?
n
Hi @Yee Yes smaller testcase for flytekit issue Here is empty file code
Copy code
def empty_flytefile():
    def noop():
        pass

    return FlyteFile(
        path="<gs://fake/path/to/nowhere>", downloader=noop, remote_path="<gs://fake/path/to/nowhere>"
    )
And in speaking with @Haytham Abuelfutuh earlier the
Union[FlyteFile, None]
feature is currently being worked on. I don’t have the ticket number.
empty_flytefile()
is my workaround to make those fields optional
y
oh yeah it’s being worked on… sorry i thought you had played around with that pr.
but optional file/structured dataset type, inside a dataclass, inside a list, inside a dynamic task, is probably worth testing explicitly
n
haha yeah our use case is specific
though hopefully not too crazy
y
nope
should be fully supported.
👍 1
e
@Nicholas LoFaso, is
level1b_dynamic
also cached by any chance? Also, can you double-check the versions of
flytekit
and
dataclass-json
of the failing run?
n
Hi @Eduardo Apolinario (eapolinario) yes level1b_dynamic is also cached, and
flytekit==0.31.0
,
dataclasses-json==0.5.6
e
thanks. What was the result of this second run @Nicholas LoFaso?
n
@Eduardo Apolinario (eapolinario) @Yee Figured out the issue was a typo on my part. It was unrelated to the dataclass I sent earlier. I had a dataclass that included an Enum. That enum had the
@dataclass_json
decorator by mistake and caused the error. Small test case to reproduce
Copy code
from dataclasses import dataclass
from dataclasses_json import dataclass_json
from enum import Enum

from flytekit import task, workflow

# ERROR: having these two annotations causes
# TypeError: __call__() missing 1 required positional argument: 'value'
@dataclass_json
@dataclass
class Aggregation(Enum):
    GROUP_1x1 = "1x1"
    GROUP_5x1 = "5x1"


@dataclass_json
@dataclass
class Nested:
    agg: Aggregation


@task
def tsk(nested: Nested):
    if nested.agg == Aggregation.GROUP_1x1:
        print("1x1")
    elif nested.agg == Aggregation.GROUP_5x1:
        print("5x1")
    else:
        raise ValueError("bad aggregation value")


@workflow
def wf():
    a = tsk(nested=Nested(agg=Aggregation.GROUP_1x1))


if __name__ == "__main__":
    wf()
All my issues (for now 😉 ) are resolved! Thanks for all the support 🙇
e
Amazing, @Nicholas LoFaso! We'll make sure that errors like this are more descriptive in flytekit in future releases.
🎯 1
👍 1
y
i’ll make a ticket for this.
204 Views