seunggs
11/19/2022, 3:39 AMflytekit.extras
types, but it doesn’t seem to exist in flytekit
(v1.2.3) - do I need to install a separate module for this?Felix Ruess
11/19/2022, 7:46 PMAWS_ACCESS_KEY_ID
env var to my container/pod, the FLYTE_AWS_ACCESS_KEY_ID
is ignored...
So this means I can not set these env vars in my container for use in my user code...
Shouldn't the flyte "internal" things always use the FLYTE_AWS_ACCESS_ vars?Felix Ruess
11/19/2022, 7:51 PMBernardo Galvao
11/21/2022, 8:46 AMpandera.infer_schema
. Now, I want to build a model inference server with fastapi. The thing is, I have no way of converting DataFrameSchema
to SchemaModel
, so that it can be used with FastAPI and have nicely generated Swagger UI docs.
Can anyoone help me convert DataFrameSchema
to SchemaModel
? Or get to a solution to use with FastAPI?Calvin Leather
11/21/2022, 8:16 PM@task
def test_task(input: int) -> int:
import time
time.sleep(600)
return input + 1
@dynamic
def test_dynamic(list_of_ints: List[int]) -> List[int]:
results = []
for integer in list_of_ints:
result = test_task(input=integer).with_overrides(requests=Resources(cpu=f"{integer}", mem=f"{integer}Gi"))
results.append(result)
return results
Louis DiNatale
11/22/2022, 3:08 PMTarmily Wen
11/22/2022, 5:30 PMLouis DiNatale
11/22/2022, 6:19 PMFrank Shen
11/22/2022, 6:19 PMflytekit.exceptions.scopes.FlyteScopedUserException: Could not find a renderer for <class 'modin.pandas.dataframe.DataFrame'>
...
File ".../flytekit/types/structured/structured_dataset.py", line 699, in to_html
raise NotImplementedError(f"Could not find a renderer for {type(df)} in {self.Renderers}")
NotImplementedError: Could not find a renderer for <class 'modin.pandas.dataframe.DataFrame'> in {<class 'pandas.core.frame.DataFrame'>: <flytekit.deck.renderer.TopFrameRenderer object at 0x
Ailin Yu
11/22/2022, 6:31 PMreference_launch_plans
and a dynamic
workflow. When I try to view this workflow in the Flyte console, everything shows up as unknown (Node ID’s, type, workflow/task name, etc.), and the graph doesn’t render. When I try to get execution details using flytectl get execution --details
, I get this error:
{"json":{},"level":"info","msg":"Retrieved 1 executions","ts":"2022-11-22T10:25:18-08:00"}
Error: unsupported literal scalar type *core.Scalar_Union
{"json":{},"level":"error","msg":"unsupported literal scalar type *core.Scalar_Union","ts":"2022-11-22T10:27:16-08:00"}
Is what I’m trying to do here something that isn’t supported?Arshak Ulubabyan
11/23/2022, 11:34 AMAndrew Korzhuev
11/23/2022, 12:23 PM{{.hostname}}
is not propagated from the pods correctly. Were anybody looking into that issue?
https://github.com/flyteorg/flyte/issues/2635
https://github.com/flyteorg/flyteplugins/pull/293Babis Kiosidis
11/23/2022, 2:09 PMseunggs
11/23/2022, 2:14 PMAssertionError: Tuples are not a supported type for individual values in Flyte - got a tuple - Output(model_state=Promise(node:n0.model_state)). If using named tuple in an inner task, please, de-reference theactual attribute that you want to use. For example, in NamedTuple('OP', x=int) thenreturn v.x, instead of v, even if this has a single element
seunggs
11/23/2022, 2:15 PMhello_output = typing.NamedTuple("OP", greet=str)
@task
def say_hello() -> hello_output:
return hello_output("hello world")
seunggs
11/23/2022, 2:15 PMseunggs
11/23/2022, 2:16 PMTrainingOutputs = typing.NamedTuple(
"TrainingOutputs",
model_state=FlyteFile,
)
def train_task(hp: Hyperparameters) -> TrainingOutputs:
...
return TrainingOutputs(model_state=FlyteFile(MODEL_PATH))
seunggs
11/23/2022, 2:16 PMKatrina P
11/23/2022, 6:26 PMkubectl
but they're the same. Is this correct configuration for updating log links in propeller? Do I need to restart anything other than propeller to update the log links?
task_logs.yaml: |
plugins:
logs:
cloudwatch-enabled: false
kubernetes-enabled: false
templates:
- displayName: "Data Dog"
templateUris:
- "<https://app.datadoghq.com/logs?query=pod_name%3A{{> .podName }}%20kube_namespace%3A{{ .namespace }}%20&cols=&index=&messageDisplay=inline&stream_sort=desc&viz=stream&from_ts={{ .podUnixStartTime }}000&to_ts={{ .podUnixFinishTime }}000&live=false"
Kilian Tep
11/24/2022, 4:27 AM@task
def to_training_config(cfg: DictConfig) -> TrainingConfig:
return TrainingConfig(**cfg)
TrainingConfig
is simply a custom dataclass_json
object. This function simply aims to convert a DictConfig
object to the dataclass so I can reuse it later within my workflow. I am getting the following error:
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
/opt/venv/lib/python3.7/site-packages/flytekit/core/base_task.py in dispatch_execute(self, ctx, input_literal_map)
521 try:
--> 522 literals[k] = TypeEngine.to_literal(exec_ctx, v, py_type, literal_type)
523 except Exception as e:
/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in to_literal(cls, ctx, python_val, python_type, expected)
751
--> 752 lv = transformer.to_literal(ctx, python_val, python_type, expected)
753
/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in to_literal(self, ctx, python_val, python_type, expected)
349 )
--> 350 self._serialize_flyte_type(python_val, python_type)
351 return Literal(
/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in _serialize_flyte_type(self, python_val, python_type)
397 field_type = v.type
--> 398 python_val.__setattr__(v.name, self._serialize_flyte_type(val, field_type))
399 return python_val
/opt/venv/lib/python3.7/site-packages/flytekit/core/type_engine.py in _serialize_flyte_type(self, python_val, python_type)
395 for v in dataclasses.fields(python_type):
--> 396 val = python_val.__getattribute__(v.name)
397 field_type = v.type
AttributeError: 'DictConfig' object has no attribute 'name'
Note
: this works perfectly fine when I remove the @task
decorator.
Any idea what could be the cause?SeungTaeKim
11/24/2022, 4:49 AMChandramoulee K V
11/24/2022, 8:05 AM# Copy the makefile targets to expose on the container. This makes it easier to register.
# Delete this after we update CI to not serialize inside the container
# COPY k8s_spark/sandbox.config /root
# Copy the actual code
# COPY k8s_spark/ /root/k8s_spark
# This tag is supplied by the build script and will be used to determine the version
# when registering tasks, workflows, and launch plans
# ARG tag
# ENV FLYTE_INTERNAL_IMAGE $tag
# Copy over the helper script that the SDK relies on
# RUN cp ${VENV}/bin/flytekit_venv /usr/local/bin/
# RUN chmod a+x /usr/local/bin/flytekit_venv
)
I registered the sample pyspark workflow with the image and i am facing this issue:
failed
SYSTEM ERROR! Contact platform administrators.
When looking at the logs in aws i found that it was unable to load native-hadoop library warning could this be the cause of this issue any idea?
{"log":"22/11/24 07:03:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
","stream":"stderr","docker":{"container_id":"XXX"},"kubernetes":{"container_name":"YYY","namespace_name":"flytesnacks-development","pod_name":"ZZZ","pod_id":"AAA","namespace_id":"BBB","namespace_labels":{"kubernetes_io/metadata_name":"flytesnacks-development"}}}
Liliana Mamani Sanchez
11/24/2022, 11:02 AMZhiyi Li
11/25/2022, 2:18 PM{"json":{"src":"base.go:73"},"level":"fatal","msg":"caught panic: kafka: client has run out of available brokers to talk to (Is your cluster reachable?) [goroutine 1 [running]:\nruntime/debug.Stack()\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0x65\<http://ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice.NewAdminServer.func1()\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/base.go:73|ngithub.com/flyteorg/flyteadmin/pkg/rpc/adminservice.NewAdminServer.func1()\n\t/go/src/github.com/flyteorg/flyteadmin/pkg/rpc/adminservice/base.go:73> +0x88\npanic({0x224d580, 0xc0009c8300})\n\t/usr/local/go/src/runtime/panic.go:838 +0x207\<http://ngithub.com/flyteorg/flyteadmin/pkg/async/cloudevent.NewCloudEventsPublisher(|ngithub.com/flyteorg/flyteadmin/pkg/async/cloudevent.NewCloudEventsPublisher(>{0x2bd10a0, 0xc00005a018}, {0x1, {0xc000f2e460, 0x5}, {{0x0, 0x0}},
The weird thing is that we tried to use the same version sarama client to connect to the kafka instance in the same cluster, and it works. (Also I was able to success at this step, yet after I redeployed with eventTypes set to a single type it failed and could never get back to normal)
The cloudevent config I used is the following:
cloudevents:
enable: true
kafka:
brokers: kafkaip
version:
version:
- 2
- 2
- 0
- 0
eventsPublisher:
eventTypes:
- all
topicName: workflow-engine-test
type: Kafka
Any clue what could be the problem?Andreas Fehlner
11/27/2022, 8:02 PMEvan Sadler
11/28/2022, 6:14 PMpyflyte
can register a file or folder of launch plans, but I am not exactly sure how to active them all. I don’t want to have to active specific launch plans inside the github actions workflow. My best idea is to throw everything in a Makefile.
Any advice is much appreciated!
# only sudo code implemented
.SILENT: deploy
.PHONY: deploy
deploy
pyflyte register [flags] ./recsys/deploy/launchplans.py;
# I could list them here?
flytectl update [flags] launchplan prod_ease_train;
Katrina P
11/28/2022, 7:20 PMflytectl update task-resource-attribute
does that setting still have to be within whatever limits we set in the Admin config map, right? We've tried to up the task-resource-attribute but when registering the workflow still get
Requested MEMORY limit [1Ti] is greater than current limit set in the platform configuration [512Gi]. Please contact Flyte Admins to change these limits or consult the configuration
Because if I call flytectl get task-resource-attribute
it does say {"project":"<>","domain":"<>","defaults":{"cpu":"500m","memory":"1Gi"},"limits":{"cpu":"2","memory":"1024Gi"}}
Tarmily Wen
11/28/2022, 10:29 PMk8s:
plugins:
# -- Configuration section for all K8s specific plugins [Configuration structure](<https://pkg.go.dev/github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config>)
k8s:
default-env-vars: []
# DEFAULT_ENV_VAR: VALUE
default-cpus: 100m
default-memory: 100Mi
resource-tolerations:
- <http://nvidia.com/gpu|nvidia.com/gpu>:
- key: "num-gpus"
operator: "Equal"
value: 1
effect: "NoSchedule"
- <http://nvidia.com/gpu|nvidia.com/gpu>:
- key: "num-gpus"
operator: "Equal"
value: 4
effect: "NoSchedule"
Jay Ganbat
11/28/2022, 11:52 PMsample_name: sample_workflow_results
. When some sample fails and we recover, unfortunately the output mapped dict is all shuffled up and no longer matches with the sample_name key.
For example we expect the certain dynamic task returns filename -> filename-suffix
map like below
{
"counter_1": 1,
"counter_2": 2,
"counter_3": 3,
"counter_4": 4,
"counter_5": 5,
"counter_6": 6
}
and i forcefully fail some tasks and some successfully completes. Then when i recover i got this instead
{
"counter_1": 5,
"counter_2": 6,
"counter_3": 3,
"counter_4": 4,
"counter_5": 5,
"counter_6": 4
}
example dynamic task is below
@dynamic(cache=True, cache_version=get_workflow_version())
def compute_average_for_chunks(
chunks: Dict[str, FlyteFile], infile: FlyteFile
) -> Dict[str, int]:
suffix_dict = {}
for chunk_name, chunk_path in chunks.items():
infile_suffix = get_file_suffix_task(chunk_path=chunk_path, infile=infile)
suffix_dict[chunk_name] = infile_suffix
return suffix_dict
Ketan (kumare3)
11/29/2022, 1:28 AMKetan (kumare3)
11/29/2022, 1:28 AM