Jake Neyer
05/25/2022, 5:40 PMGeorge Odette
05/25/2022, 8:59 PMSamhita Alla
05/26/2022, 9:46 AMSandra Youssef
05/26/2022, 11:33 PMSandra Youssef
05/27/2022, 4:41 PMSandra Youssef
05/30/2022, 5:50 PMEugene Cha
05/31/2022, 6:13 AMdef hash_pandas_dataframe(df: pandas.DataFrame) -> str:
return str(pandas.util.hash_pandas_object(df))
@task
def uncached_data_reading_task() -> Annotated[
pandas.DataFrame, HashMethod(hash_pandas_dataframe)
]:
return pandas.DataFrame({"column_1": [1, 2, 3]})
@task(cache=True, cache_version="1.0")
def cached_data_processing_task(df: pandas.DataFrame) -> pandas.DataFrame:
time.sleep(50)
return df * 2
@task
def compare_dataframes(df1: pandas.DataFrame, df2: pandas.DataFrame):
assert df1.equals(df2)
@workflow
def cached_dataframe_wf():
raw_data = uncached_data_reading_task()
# We execute `cached_data_processing_task` twice, but we force those
# two executions to happen serially to demonstrate how the second run
# hits the cache.
t1_node = create_node(cached_data_processing_task, df=raw_data)
t2_node = create_node(cached_data_processing_task, df=raw_data)
t1_node >> t2_node
# Confirm that the dataframes actually match
compare_dataframes(df1=t1_node.o0, df2=t2_node.o0)
if __name__ == "__main__":
df1 = cached_dataframe_wf()
print(f"Running cached_dataframe_wf once : {df1}")
but sometimes the caching works and sometimes it doesnt. we've tried running with pyflyte run --remote caching.py cached_dataframe_wf as well as trying the relaunch button but as you can see in the pictures it tends to not work and i'm not sure why. any ideas?Emirhan Karagül
06/01/2022, 11:13 AM~ flytectl get execution -p default -d production f38f2b53cfda08ccb000 -o yaml
closure:
createdAt: "2022-05-31T14:03:10.025255897Z"
duration: 501.772263113s
outputs:
uri: gs://<our-flyte-store>/metadata/propeller/default-production-f38f2b53cfda08ccb000/end-node/data/0/outputs.pb
phase: SUCCEEDED
startedAt: "2022-05-31T14:03:15.128550714Z"
updatedAt: "2022-05-31T14:11:36.900814113Z"
workflowId:
domain: production
name: flyte_workflows.collaborative_filtering.workflow.pipeline
project: default
resourceType: WORKFLOW
version: 0.2.2
id:
domain: production
name: f38f2b53cfda08ccb000
project: default
spec:
launchPlan:
domain: production
name: hydra_workflow_cfg_flyte_workflows.collaborative_filtering.workflow_0
project: default
resourceType: LAUNCH_PLAN
version: 0.2.2
metadata:
mode: SCHEDULED
scheduledAt: "2022-06-01T05:00:00Z"
systemMetadata: {}
Robin Kahlow
06/01/2022, 11:51 AMpyflyte run
on
@workflow
def wf(
total_samples: List[int] = [16, 32, 64, 256],
):
I get TypeError: the JSON object must be str, bytes or bytearray, not list
but without specifying a default it does work (and i can pass it to pyflyte as a string json list)Robin Kahlow
06/01/2022, 3:57 PM# -- Kubernetes specific Flyte configuration
k8s:
plugins:
# -- Configuration section for all K8s specific plugins [Configuration structure](<https://pkg.go.dev/github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config>)
k8s:
default-env-vars: []
# DEFAULT_ENV_VAR: VALUE
default-cpus: 100m
default-memory: 100Mi
resource-tolerations:
- <http://nvidia.com/gpu|nvidia.com/gpu>:
- key: "key1"
operator: "Equal"
value: "value1"
effect: "NoSchedule"
and I applied that with helm and also tried restarting the Flyte pods (kubectl rollout restart deploy), but the pods that get started by Flyte workflows don't get these tolerations (although they do get a default nvidia.com/gpu "exists" toleration regardless of my addition above). Anything I'm doing wrong?Sandra Youssef
06/01/2022, 5:18 PMRecording▾
Recording▾
Recording▾
David Przybilla
06/02/2022, 6:25 AMcontributor_guide.rst
to flyteorg/flyte
.
https://github.com/dav009/flyte/blob/dp-contributor-fastrack/rsts/community/contributor_fast_track.mdMaarten de Jong
06/02/2022, 11:31 AMNastya Rusina
06/02/2022, 7:51 PMMatheus Moreno
06/03/2022, 3:08 PMDeploying Flyte...
Getting updates for unmanaged Helm repositories...
...Successfully got an update from the "<https://googlecloudplatform.github.io/spark-on-k8s-operator>" chart repository
...Successfully got an update from the "<https://kubernetes.github.io/dashboard/>" chart repository
...Successfully got an update from the "<https://charts.bitnami.com/bitnami>" chart repository
Error: can't get a valid version for repositories contour. Try changing the version constraint in Chart.yaml
SeungTaeKim
06/07/2022, 4:22 AMRahul Mehta
06/07/2022, 10:21 PMKlemens Kasseroller
06/08/2022, 12:33 PMfrom dataclasses import dataclass
from flytekit import task, workflow
from typing import List
from dataclasses_json import dataclass_json
from flytekit.types.file import FlyteFile
@dataclass_json
@dataclass
class InputsContainer:
files: List[FlyteFile]
@task
def task1(inputs: List[FlyteFile]) -> InputsContainer:
print("TASK1 remote source: ", inputs[0].remote_source)
return InputsContainer(files=inputs)
@task
def task2(inputs: InputsContainer) -> None:
print("TASK2 remote source: ", inputs.files[0].remote_source)
@workflow
def main_workflow(inputs: List[FlyteFile]) -> None:
task1_outputs = task1(inputs=inputs)
task2(inputs=task1_outputs)
if __name__ == '__main__':
file_path = FlyteFile("<s3://test-bucket/test.json>")
main_workflow(inputs=[file_path])
The output generated is:
TASK1 remote source: <s3://test-bucket/test.json>
TASK2 remote source: None
Could anyone help me out here? Thanks!Andrew Dye
06/08/2022, 4:08 PMSandra Youssef
06/09/2022, 4:58 PMvideo▾
Sandra Youssef
06/09/2022, 5:23 PMBrian Tang
06/10/2022, 11:21 PMflytepropeller
deployment and it’s hard to tell if the system is picking up the changes:
$ kubectl logs -f deployment/flytepropeller -nflyte | grep -i logs
Found 6 pods, using pod/flytepropeller-785bcc6f6d-sr8bl
{"json":{"src":"viper.go:398"},"level":"debug","msg":"Config section [plugins.logs] updated. No update handler registered.","ts":"2022-06-08T16:32:15Z"}
{"json":{"src":"viper.go:396"},"level":"debug","msg":"Config section [plugins.logs] hasn't changed.","ts":"2022-06-10T22:36:33Z"}
From looking at the plugins — it seems like adding that config should just automatically workRobin Kahlow
06/12/2022, 12:23 AMpython commands.py build-execute.py --version v1
Afiz
06/12/2022, 2:52 PMSandra Youssef
06/13/2022, 7:40 PMGeorge Odette
06/14/2022, 6:27 PMVijay Saravana
06/14/2022, 7:23 PMSlackbot
06/15/2022, 9:14 AMFabio Grätz
06/15/2022, 10:56 AMFabio Grätz
06/15/2022, 10:56 AMDan Rammer (hamersaw)
06/15/2022, 11:21 AMflytesnacks
and domain development
will look for a PodTemplate in the flytesnacks-development
namespace). If that PodTemplate does not exist, it then attempts to find on in the namespace that FlytePropeller runs in.apiVersion: v1
kind: PodTemplate
metadata:
name: flyte-default-template
namespace: flyte
template:
metadata:
spec:
containers:
- name: noop
image: <http://docker.io/rwgrim/docker-noop|docker.io/rwgrim/docker-noop>
subdomain: "default-subdomain"
Where in this example I defined a noop
container.Fabio Grätz
06/15/2022, 11:56 AMDan Rammer (hamersaw)
06/15/2022, 1:51 PMFabio Grätz
06/15/2022, 1:54 PMDan Rammer (hamersaw)
06/15/2022, 1:56 PMFabio Grätz
06/15/2022, 1:58 PMDan Rammer (hamersaw)
06/15/2022, 2:02 PMFabio Grätz
06/19/2022, 8:40 PM<http://github.com/GoogleCloudPlatform/spark-on-k8s-operator|github.com/GoogleCloudPlatform/spark-on-k8s-operator> v0.0.0-20200723154620-6f35a1152625
.
In this version, SparkPodSpec
has:
// SecurityContenxt specifies the PodSecurityContext to apply.
// +optional
SecurityContenxt *apiv1.PodSecurityContext
Notice the SecurityContext
of type PodSecurityContext
.
In flyteplugins we set the spark pod's security context to the DefaultPodSecurityContext accordingly:
SecurityContenxt: config.GetK8sPluginConfig().DefaultPodSecurityContext.DeepCopy(),
The k8s plugin config has both a DefaultPodSecurityContext
as well as a DefaultSecurityContext
.
In the newer spark-on-k8s-operator versions, this has been fixed and there is now both the PodSecurityContext
as well as the SecurityContext
.
Do you agree that this should be fixed in flyteplugins by using a newer version of the spark-on-k8s-operator?
I tried fixing this but go get -v -u <http://github.com/GoogleCloudPlatform/spark-on-k8s-operator@master|github.com/GoogleCloudPlatform/spark-on-k8s-operator@master>
gives me the following error:
go: <http://github.com/GoogleCloudPlatform/spark-on-k8s-operator@v0.0.0-20220615230608-94775cd89ca0|github.com/GoogleCloudPlatform/spark-on-k8s-operator@v0.0.0-20220615230608-94775cd89ca0> requires
<http://k8s.io/kubernetes@v1.19.6|k8s.io/kubernetes@v1.19.6> requires
<http://k8s.io/api@v0.0.0|k8s.io/api@v0.0.0>: reading <http://k8s.io/api/go.mod|k8s.io/api/go.mod> at revision v0.0.0: unknown revision v0.0.0
This appears to be a known issue due to the way k8s uses its go.mod
and people have written bash scripts to work around this.
I wonder whether you are others within flyteorg have experienced this before and can give me a hint how to handle this (in case you agree that upgrading spark-on-k8s-operator makes sense).
Thanks 🙂Ketan (kumare3)
06/20/2022, 12:13 AMDan Rammer (hamersaw)
06/22/2022, 4:02 AMreplace
commands for all of the k8s internal dependencies because the kubernetes repo is not meant to be a dependency so they declare v0.0.0 for all and use replace
to point to a local version. Is this going to be a fix we can isolate to flyteplugins? Or would this need to be in flytepropeller as well? Not sure how replace cascades in the build.
I think that integrating a script to pull k8s dependencies and insert replace statements in the go.mod may be solution? @Haytham Abuelfutuh / @Yuvraj thoughts?Fabio Grätz
06/23/2022, 4:27 PMreplace
via the script in flyteplugins works without cascading to flytepropeller @Dan Rammer (hamersaw).Dan Rammer (hamersaw)
06/23/2022, 5:34 PMFabio Grätz
06/26/2022, 9:35 PMreplace
instructions for all used k8s packages (to later test whether this workaround can be used in flyteplugins without cascading to flytepropeller). Works smoothly.
In order to get the tests green again, I’m now working on fixing tasks/pluginmachinery/k8s/client.go
since <http://sigs.k8s.io/controller-runtime|sigs.k8s.io/controller-runtime>
is updated from v0.8.2
to v0.12.2
and two tags after v0.8.2
, in v0.9.0-alpha.0
, the ClientBuilder
which flyteplugins uses here was deprecated in favor of NewClientFunc
(see commit message).
I haven’t figured out how to adapt flyteplugins to that change yet but will continue working on this in the next few days…
Might have to get back to you for some guidance 😅🙏Dan Rammer (hamersaw)
07/18/2022, 4:53 PMFabio Grätz
07/19/2022, 11:46 AMDan Rammer (hamersaw)
07/26/2022, 1:51 PMPodSecurityContext
the only field that we're gaining by an upgrade the latest spark operator?Ketan (kumare3)
07/26/2022, 1:55 PMFabio Grätz
07/26/2022, 4:35 PMPodSecurityContext
. I will time-box this effort.
• I will transfer all fields from the k8s plugin config that can be set with the current spark-on-k8s version (which would solve my current problem of using the default tolerations) and I would be happy to upgrade spark-on-k8s and also other plugins later once the plugins have been moved out of core.
Does that make sense to you?Dan Rammer (hamersaw)
07/26/2022, 5:25 PMFabio Grätz
07/26/2022, 5:58 PMYee
07/26/2022, 6:12 PM