George Odette
05/25/2022, 8:59 PMSamhita Alla
Sandra Youssef
Sandra Youssef
Sandra Youssef
Eugene Cha
05/31/2022, 6:13 AMdef hash_pandas_dataframe(df: pandas.DataFrame) -> str:
return str(pandas.util.hash_pandas_object(df))
@task
def uncached_data_reading_task() -> Annotated[
pandas.DataFrame, HashMethod(hash_pandas_dataframe)
]:
return pandas.DataFrame({"column_1": [1, 2, 3]})
@task(cache=True, cache_version="1.0")
def cached_data_processing_task(df: pandas.DataFrame) -> pandas.DataFrame:
time.sleep(50)
return df * 2
@task
def compare_dataframes(df1: pandas.DataFrame, df2: pandas.DataFrame):
assert df1.equals(df2)
@workflow
def cached_dataframe_wf():
raw_data = uncached_data_reading_task()
# We execute `cached_data_processing_task` twice, but we force those
# two executions to happen serially to demonstrate how the second run
# hits the cache.
t1_node = create_node(cached_data_processing_task, df=raw_data)
t2_node = create_node(cached_data_processing_task, df=raw_data)
t1_node >> t2_node
# Confirm that the dataframes actually match
compare_dataframes(df1=t1_node.o0, df2=t2_node.o0)
if __name__ == "__main__":
df1 = cached_dataframe_wf()
print(f"Running cached_dataframe_wf once : {df1}")
but sometimes the caching works and sometimes it doesnt. we've tried running with pyflyte run --remote caching.py cached_dataframe_wf as well as trying the relaunch button but as you can see in the pictures it tends to not work and i'm not sure why. any ideas?Emirhan Karagül
06/01/2022, 11:13 AM~ flytectl get execution -p default -d production f38f2b53cfda08ccb000 -o yaml
closure:
createdAt: "2022-05-31T14:03:10.025255897Z"
duration: 501.772263113s
outputs:
uri: gs://<our-flyte-store>/metadata/propeller/default-production-f38f2b53cfda08ccb000/end-node/data/0/outputs.pb
phase: SUCCEEDED
startedAt: "2022-05-31T14:03:15.128550714Z"
updatedAt: "2022-05-31T14:11:36.900814113Z"
workflowId:
domain: production
name: flyte_workflows.collaborative_filtering.workflow.pipeline
project: default
resourceType: WORKFLOW
version: 0.2.2
id:
domain: production
name: f38f2b53cfda08ccb000
project: default
spec:
launchPlan:
domain: production
name: hydra_workflow_cfg_flyte_workflows.collaborative_filtering.workflow_0
project: default
resourceType: LAUNCH_PLAN
version: 0.2.2
metadata:
mode: SCHEDULED
scheduledAt: "2022-06-01T05:00:00Z"
systemMetadata: {}
Robin Kahlow
06/01/2022, 11:51 AMpyflyte run
on
@workflow
def wf(
total_samples: List[int] = [16, 32, 64, 256],
):
I get TypeError: the JSON object must be str, bytes or bytearray, not list
but without specifying a default it does work (and i can pass it to pyflyte as a string json list)Robin Kahlow
06/01/2022, 3:57 PM# -- Kubernetes specific Flyte configuration
k8s:
plugins:
# -- Configuration section for all K8s specific plugins [Configuration structure](<https://pkg.go.dev/github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config>)
k8s:
default-env-vars: []
# DEFAULT_ENV_VAR: VALUE
default-cpus: 100m
default-memory: 100Mi
resource-tolerations:
- <http://nvidia.com/gpu|nvidia.com/gpu>:
- key: "key1"
operator: "Equal"
value: "value1"
effect: "NoSchedule"
and I applied that with helm and also tried restarting the Flyte pods (kubectl rollout restart deploy), but the pods that get started by Flyte workflows don't get these tolerations (although they do get a default nvidia.com/gpu "exists" toleration regardless of my addition above). Anything I'm doing wrong?Sandra Youssef
David Przybilla
06/02/2022, 6:25 AMcontributor_guide.rst
to flyteorg/flyte
.
https://github.com/dav009/flyte/blob/dp-contributor-fastrack/rsts/community/contributor_fast_track.mdMaarten de Jong
06/02/2022, 11:31 AMNastya Rusina
06/02/2022, 7:51 PMMatheus Moreno
06/03/2022, 3:08 PMDeploying Flyte...
Getting updates for unmanaged Helm repositories...
...Successfully got an update from the "<https://googlecloudplatform.github.io/spark-on-k8s-operator>" chart repository
...Successfully got an update from the "<https://kubernetes.github.io/dashboard/>" chart repository
...Successfully got an update from the "<https://charts.bitnami.com/bitnami>" chart repository
Error: can't get a valid version for repositories contour. Try changing the version constraint in Chart.yaml
SeungTaeKim
06/07/2022, 4:22 AMRahul Mehta
06/07/2022, 10:21 PMKlemens Kasseroller
06/08/2022, 12:33 PMfrom dataclasses import dataclass
from flytekit import task, workflow
from typing import List
from dataclasses_json import dataclass_json
from flytekit.types.file import FlyteFile
@dataclass_json
@dataclass
class InputsContainer:
files: List[FlyteFile]
@task
def task1(inputs: List[FlyteFile]) -> InputsContainer:
print("TASK1 remote source: ", inputs[0].remote_source)
return InputsContainer(files=inputs)
@task
def task2(inputs: InputsContainer) -> None:
print("TASK2 remote source: ", inputs.files[0].remote_source)
@workflow
def main_workflow(inputs: List[FlyteFile]) -> None:
task1_outputs = task1(inputs=inputs)
task2(inputs=task1_outputs)
if __name__ == '__main__':
file_path = FlyteFile("<s3://test-bucket/test.json>")
main_workflow(inputs=[file_path])
The output generated is:
TASK1 remote source: <s3://test-bucket/test.json>
TASK2 remote source: None
Could anyone help me out here? Thanks!Andrew Dye
06/08/2022, 4:08 PMSandra Youssef
Sandra Youssef
Brian Tang
06/10/2022, 11:21 PMflytepropeller
deployment and it’s hard to tell if the system is picking up the changes:
$ kubectl logs -f deployment/flytepropeller -nflyte | grep -i logs
Found 6 pods, using pod/flytepropeller-785bcc6f6d-sr8bl
{"json":{"src":"viper.go:398"},"level":"debug","msg":"Config section [plugins.logs] updated. No update handler registered.","ts":"2022-06-08T16:32:15Z"}
{"json":{"src":"viper.go:396"},"level":"debug","msg":"Config section [plugins.logs] hasn't changed.","ts":"2022-06-10T22:36:33Z"}
From looking at the plugins — it seems like adding that config should just automatically workRobin Kahlow
06/12/2022, 12:23 AMpython commands.py build-execute.py --version v1
Afiz
06/12/2022, 2:52 PMSandra Youssef
George Odette
06/14/2022, 6:27 PMVijay Saravana
06/14/2022, 7:23 PMSlackbot
06/15/2022, 9:14 AMFabio Grätz
06/15/2022, 10:56 AMThomas Addison
06/15/2022, 2:30 PMmap_task
? I've tried by adding the caching stuff to the @task
decorator and in the "mapification" like this:
map_task(my_task)(input=input).with_overrides(cache=True, cache_version="1.0")
but I'm not seeing in the flyte console the little swirly that indicates the task either wrote to or read from the cache. Is caching a map task supported?