tim leonard
03/06/2023, 3:08 PM[ContainersNotReady|PodInitializing]: containers with unready status: [... flyte-copilot-sidecar]|
Is this because it can't access the image inside the sandbox, or something more complicated? Is there a way to get this type of ContainerTask example to work on the sandbox?
As is, my article ends like this - 'So you can push the dag (pyflyte run --remote ..
), see it in Flyte ui, but it won't run locally, so go find a prod server if you want to fullly test, ok gl with that bye'Evan Sadler
03/06/2023, 3:58 PMflyte demo
that spins up a container registry at localhost:3000
. Pushing images will make them available to the demo cluster.
These are the best instructions I could find: https://discuss.flyte.org/t/8490042/maybe-it-s-just-me-but-when-i-followed-https-docs-flyte-org-
Hopefully that helps!Ketan (kumare3)
tim leonard
03/06/2023, 4:02 PMdocker build . -t localhost:30000/dbt_project:001
docker push localhost:30000/dbt_project:001
But got
The push refers to repository [localhost:30000/dbt_project]
Get "<http://localhost:30000/v2/>": dial tcp [::1]:30000: connect: connection refused
Looking at my sandbox in the docker pane (and kubectl get pods -n flyte
on the instance), I don't see a 30000
running (screenshot).
Is there something I have to do on flytectl launch to get this internal docker launched and exposed?
# flytectl version on docker image
>> bash-5.1# flytectl version
"App": "flytectl",
"Build": "b0d9893",
"Version": "0.6.26",
# flytectl version on local
"App": "flytectl",
"Build": "217383e",
"Version": "0.6.31",
flytectl sandbox start
does not include the Docker registry.
However, flytectl demo start
does include it. That's a bit of a trap, but glad to have solved it 🙂
Was able to back track off localhost:30000
and find these docs which got me unstuck.
https://docs.flyte.org/projects/cookbook/en/latest/index.htmlEvan Sadler
03/06/2023, 4:32 PMtim leonard
03/06/2023, 5:18 PMtask submitted to K8s
the kubectl logs
seems to just show the task being run over and over
Here's an extract of the messages on the sandbox-flyte-binary pod.
'Processing Workflow.',
'Handling Workflow [f65bc5594c6f04347844], id: [project:',
'Node has [Succeeded], traversing downstream.',
'Handling downstream Nodes',
'Handling node Status [Running]',
'Parallelism criteria not met, Current [0], Max [25]',
'Handling Node [n0]',
'node executing, current phase [Running]',
'Executing node',
"Dynamic handler.Handle's called with phase 0.",
'No plugin found for Handler-type [raw-container], defaulting to [container]',
'Catalog CacheSerializeDisabled: for Task [flytesnacks/development/demo1.model.my_new_package.model_a/kpMsPqJj9ldUB9thyzt6Qg==]',
'p+Version previously seen .. no event will be sent',
'No state change for Task, previously observed same transition. Short circuiting.',
'Task still running',
'regular node detected, (no future file found)',
'Node execution round complete',
'node execution completed',
'Completed node [n0]',
'Handling Workflow [f65bc5594c6f04347844] Done',
'Observed FlyteWorkflow Update (maybe finalizer)',
'Updated workflow.',
'Will not fast follow, Reason: Wf terminated? false, Version matched? true',
'Completed processing workflow.',
"Successfully synced 'flytesnacks-development/f65bc5594c6f04347844'",
'Subqueue handler batch round',
'Dynamically configured batch size [-1]',
'Exiting SubQueue handler batch round',
[...]
'Adding resource type for unspecified value in request: [id:<project:',
'Subqueue handler batch round',
'Dynamically configured batch size [-1]',
[...] 'Exiting SubQueue handler batch round',
'==> Enqueueing workflow [flytesnacks-development/f65bc5594c6f04347844]',
'Processing Workflow.',
'Handling Workflow [f65bc5594c6f04347844], id: [project:',
'Node has [Succeeded], traversing downstream.',
'Handling downstream Nodes',
'Handling node Status [Running]',
'Parallelism criteria not met, Current [0], Max [25]',
'Handling Node [n0]',
'node executing, current phase [Running]',
'Executing node',
"Dynamic handler.Handle's called with phase 0.",
'No plugin found for Handler-type [raw-container], defaulting to [container]',
'Catalog CacheSerializeDisabled: for Task [flytesnacks/development/demo1.model.my_new_package.model_a/kpMsPqJj9ldUB9thyzt6Qg==]',
'p+Version previously seen .. no event will be sent',
'No state change for Task, previously observed same transition. Short circuiting.',
'Task still running',
'regular node detected, (no future file found)',
'Node execution round complete',
'node execution completed',
'Completed node [n0]',
'Handling Workflow [f65bc5594c6f04347844] Done',
'Observed FlyteWorkflow Update (maybe finalizer)',
'Updated workflow.',
'Will not fast follow, Reason: Wf terminated? false, Version matched? true',
'Completed processing workflow.',
"Successfully synced 'flytesnacks-development/f65bc5594c6f04347844'"]
David Espejo (he/him)
03/06/2023, 5:23 PMtim leonard
03/06/2023, 5:28 PMdocker image tag <http://ghcr.io/flyteorg/rawcontainers-shell:v2|ghcr.io/flyteorg/rawcontainers-shell:v2> localhost:30000/rawcontainers-shell:v2
& push, to get the example container on localhost,
in case your wondering about that on the task pane)Ketan (kumare3)
Eduardo Apolinario (eapolinario)
03/07/2023, 2:02 AMKetan (kumare3)
tim leonard
03/07/2023, 2:32 PM0.6.32
to 0.6.33
Amazing stuff to see this going locally, with rawContainers, thanks again all!Ketan (kumare3)
tim leonard
03/08/2023, 2:41 AMfrom flytekitplugins.dbt.task import DBTRun
from flytekitplugins.dbt.schema import DBTRunInput
from flytekit.core.workflow import ImperativeWorkflow
DBT_PROJECT_DIR = "."
DBT_PROFILES_DIR = "."
DBT_PROFILE = "bq-oauth"
task1 = DBTRun(
name='model_a',
input=DBTRunInput(
project_dir = DBT_PROJECT_DIR,
profiles_dir = DBT_PROFILES_DIR,
profile = DBT_PROFILE,
select = 'model_a')
)
task2 = DBTRun(
name='model_b',
input=DBTRunInput(
project_dir = DBT_PROJECT_DIR,
profiles_dir = DBT_PROFILES_DIR,
profile = DBT_PROFILE,
select = 'model_b')
)
wb3 = ImperativeWorkflow(name='imperative_dbt_demo')
task1_task_id = wb3.add_entity(task1)
task2_task_id = wb3.add_entity(task2)
# Yields:
# `FlyteAssertion: Input was not specified for: input of type simple: STRUCT`
from flytekit.core.workflow import ImperativeWorkflow
from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task
from flytekit import kwtypes, task
from flytekit.types.schema import FlyteSchema
EXAMPLE_DB = "<https://www.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip>"
import pandas
@task
def print_and_count_columns(df: pandas.DataFrame) -> int:
return len(df[df.columns[0]])
sql_task = SQLite3Task(
name="cookbook.sqlite3.sample",
query_template="select TrackId, Name from tracks limit {{.inputs.limit}}",
inputs=kwtypes(limit=int),
output_schema_type=FlyteSchema[kwtypes(TrackId=int, Name=str)],
task_config=SQLite3Config(uri=EXAMPLE_DB, compressed=True),
)
sql_task2 = SQLite3Task(
name="cookbook.sqlite3.sample",
query_template="select TrackId, Name from tracks limit {{.inputs.limit}}",
inputs=kwtypes(limit=int),
output_schema_type=FlyteSchema[kwtypes(TrackId=int, Name=str)],
task_config=SQLite3Config(uri=EXAMPLE_DB, compressed=True),
)
imperative = ImperativeWorkflow(name="my.imperative.workflow.example")
node_t1 = imperative.add_entity(sql_task, limit=100)
node_t2 = imperative.add_entity(print_and_count_columns, df=node_t1.outputs["results"])
imperative.add_workflow_output("print_output", node_t2.outputs["o0"])
# Works! 👍
Ketan (kumare3)
tim leonard
03/08/2023, 3:32 AMKetan (kumare3)
tim leonard
03/08/2023, 3:45 AM