<@U06RKAMK0UC> what’s the right RayConfig that I s...
# ask-ai
n
@ancient-wolf-19325 what’s the right RayConfig that I should be using to connect my flyte task with my existing ray cluster on EKS, right now I have something like RAY_CONFIG = RayJobConfig( address=“ray://ray-cluster-head-svc.ray.svc.cluster.local:IP”, worker_node_config=[], head_node_config=None, shutdown_after_job_finishes=False, ) and in my task im doing @task( task_config=RAY_CONFIG, requests=Resources( cpu=“2", mem=“4Gi”, ), container_image=CONTAINER_IMAGE, pod_template=POD_TEMPLATE, ) But when I run my flyte task with ray I don’t see the logs & I don’t see the ray cluster logs. I’m initalizing my ray cluster with ray.init(address=“ray://ray-cluster-head-svc.ray.svc.cluster.local:IP”) Am I doing this properly / what is the proper way to configure flyte + ray integrations here
j
To connect a Flyte task to an existing Ray cluster, you should use
RayJobConfig
with the
address
parameter set to your Ray cluster's head node address, and leave
worker_node_config
as an empty list. This is supported and tested in the Flyte Ray plugin, as shown in flytekit's test suite and official documentation. Your configuration is correct in this regard. However, Flyte's Ray plugin currently does not fully support submitting jobs to an existing Ray cluster via the
RayJobConfig(address=...)
pattern in all environments, and there are known issues where Flyte may still attempt to create a new Ray cluster or not properly forward logs from the existing cluster (see this issue). As a workaround, you can omit the
task_config
and directly call
ray.init(address=...)
inside your task function, which will connect to the existing cluster and allow you to see logs as expected. Would you like more detail or a code example? Sources: - flytekit-ray test: test_ray_task_existing_cluster - flytesnacks Ray plugin README: Submit a Ray job to existing cluster - Flyte issue: Cannot submit a Ray job to an existing cluster 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
n
@ancient-wolf-19325 That is incorrect, I cannot actually see the logs when I init to a ray cluster. this is preciesly what I’m doing however I don’t see any logs when this is launched, is there a way to fix this?
j
You are correct—when connecting to an existing Ray cluster using
ray.init(address=...)
inside a Flyte task, Flyte does not automatically collect or display Ray cluster logs in the Flyte UI. This is a known limitation: Flyte's Ray plugin only injects log collection sidecars and exposes logs when it manages the Ray cluster lifecycle itself (i.e., when it creates the cluster via
RayJobConfig
), not when connecting to an external/existing cluster. See Flyte issue #5877 and Ray logs persistence PR. Currently, there is no built-in way in Flyte to surface logs from an externally managed Ray cluster in the Flyte UI. You would need to access Ray logs directly from the Ray dashboard or your cluster's logging system. Would you like more detail or suggestions for workarounds? Sources: - Flyte issue: Cannot submit a Ray job to an existing cluster - Ray logs persistence PR 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.
n
@ancient-wolf-19325 logs for ray are only available when submitting via the submission API
Copy code
@task(
    container_image=CONTAINER_IMAGE,
    pod_template=POD_TEMPLATE,
    requests=Resources(
        cpu="1",
        mem="1Gi",
    ),
)
def make_dummy_chunks(num_chunks: int) -> list[str]:
    """Create dummy chunks for testing."""
    <http://logger.info|logger.info>(f"Creating {num_chunks} dummy chunks")
    chunks = [f"chunk_{i}" for i in range(num_chunks)]
    <http://logger.info|logger.info>(f"Created chunks: {chunks}")
    return chunks


@task(
    task_config=RAY_CONFIG,
    requests=Resources(
        cpu="2",
        mem="4Gi",
    ),
    container_image=CONTAINER_IMAGE,
    pod_template=POD_TEMPLATE,
)
def hello_world_ray_task(
    chunk: str,
    multiplier: int,
) -> str:
    """Simple Ray task that just tests basic functionality."""
    <http://logger.info|logger.info>(f"Starting hello_world_ray_task with chunk: {chunk}")

    # Test 1: Can we import ray?
    try:
        import ray

        <http://logger.info|logger.info>("✓ Ray imported successfully")
    except Exception as e:
        logger.error(f"✗ Failed to import ray: {e}")
        return f"ERROR: Failed to import ray: {e}"

    # Test 2: Can we connect to the Ray cluster?
    try:
        if ray.is_initialized():
            <http://logger.info|logger.info>("Ray is already initialized, shutting down...")
            ray.shutdown()

        ray.init(address="<ray://ray-cluster-head-svc.ray.svc.cluster.local:10001>")
        <http://logger.info|logger.info>("✓ Connected to Ray cluster")
    except Exception as e:
        logger.error(f"✗ Failed to connect to Ray cluster: {e}")
        return f"ERROR: Failed to connect to Ray cluster: {e}"

    # Test 3: Can we use Ray remote functions?
    try:

        @ray.remote
        def simple_multiply(x: int, y: int) -> int:
            return x * y

        result = ray.get(simple_multiply.remote(5, multiplier))
        <http://logger.info|logger.info>(f"✓ Ray remote function worked: 5 * {multiplier} = {result}")
    except Exception as e:
        logger.error(f"✗ Ray remote function failed: {e}")
        return f"ERROR: Ray remote function failed: {e}"

    # Test 4: Can we use Ray actors?
    try:

        @ray.remote
        class SimpleActor:
            def __init__(self, name: str):
                self.name = name

            def get_message(self) -> str:
                return f"Hello from {self.name}!"

        actor = SimpleActor.remote(f"Actor_{chunk}")
        message = ray.get(actor.get_message.remote())
        <http://logger.info|logger.info>(f"✓ Ray actor worked: {message}")
    except Exception as e:
        logger.error(f"✗ Ray actor failed: {e}")
        return f"ERROR: Ray actor failed: {e}"

    ray.shutdown()

    return f"SUCCESS: {chunk} processed with multiplier {multiplier}, result={result}, message={message}"


@workflow
def hello_world_ray_workflow(
    num_chunks: int = 2,
    multiplier: int = 10,
) -> None:
    """Simple workflow to test Ray + Flyte integration."""
    <http://logger.info|logger.info>("Starting hello_world_ray_workflow")

    # Create dummy chunks
    chunks = make_dummy_chunks(num_chunks=num_chunks)

    # Use map_task with partial like in your original code
    map_task(
        partial(
            hello_world_ray_task,
            multiplier=multiplier,
        ),
        concurrency=CONCURRENCY,
    )(chunk=chunks)


# Create launch plan
hello_world_launch_plan = LaunchPlan.get_or_create(
    name="hello_world_ray_workflow",
    workflow=hello_world_ray_workflow,
    max_parallelism=2,
)
Here is a minimal example where the logs are not visible on the ray dashboard cluster
j
You are correct: when you connect to an existing Ray cluster from a Flyte task using
ray.init(address=...)
, the logs generated by your Flyte task (e.g., via
logger.info
) are not automatically forwarded to the Ray dashboard or surfaced in the Ray cluster logs. This is because the Ray dashboard only collects logs from processes it manages directly (i.e., jobs submitted via the Ray Job Submission API or Ray Serve), not from external clients or Flyte task pods that connect via
ray.init(address=...)
(Ray docs). In Flyte, only Ray jobs launched via the Ray plugin (where Flyte manages the Ray cluster lifecycle and submits jobs through the Ray operator) will have their logs collected and made available in the Flyte UI and Ray dashboard (Ray logs persistence PR). When you connect to an existing cluster, your task's logs remain in the Flyte pod logs and are not visible in the Ray dashboard. Would you like more detail or suggestions for alternative logging strategies? Sources: - Ray logs persistence PR 👍 0 button 👎 0 button 💡 Hint: Mention @runllm in the thread for followups.