https://flyte.org logo
#ask-the-community
Title
# ask-the-community
g

Guy Arad

11/26/2023, 11:58 AM
Going through this example (merge-sort by dynamic and static workflows). I have several questions: 1. I ran locally modifying the
run_at_local_count
to 5, and adding
print(f'sorting locally {len(numbers)}')
to the
sort_locally
function. However, I still see it's being executed on a list of length 10! Maybe something is wrong with the condition in a local execution? how can I debug this? 2. The UI doesn't show all the levels. It gives me the "Maximum call stack size exceeded" error. I couldn't find this string in the repositories to understand where it is coming from. 3. Using
dynamic
here is interesting. The issue is you need to stop the recursive compilation of the workflow in register time, and make sure it's only happening at runtime. It's worth noting in the tutorial. Essentially, when doing the merge-sort, the dynamic flow is compiled when we go down the recursion 1 level at a time.
s

Samhita Alla

11/27/2023, 7:14 AM
1. have you modified the
count
variable to 5? 2. seems like a UI issue: cc @Eduardo Apolinario (eapolinario) 3. dynamic workflows are always compiled at run time
g

Guy Arad

11/27/2023, 10:58 AM
1. Yes of course 2. Thanks 3. Yes, I'm aware. The main purpose was to indicate something that's not obvious from the documentation. Looking at the dynamic workflow in the example, it seems nothing of dynamic nature in it. But since the dynamic workflow calls to the static workflow, which in turn calls back to the dynamic workflow. One of these must be defined as dynamic - and that's a really strong concept.
@Samhita Alla @Eduardo Apolinario (eapolinario) created issue: https://github.com/flyteorg/flyte/issues/4484
s

Samhita Alla

11/27/2023, 4:07 PM
1. oh. how can it execute on a list of length 10? that's weird. it works for me, though. 2. thanks for creating the issue. 3. understood. please feel free to contribute to the docs!
g

Guy Arad

11/27/2023, 4:16 PM
I don't know!! very weird... here's the code (just run the file):
Copy code
import json
import typing
from datetime import datetime
from random import random, seed
from typing import Tuple

from flytekit import conditional, dynamic, task, workflow

# seed random number generator
seed(datetime.now().microsecond)


@task
def split(numbers: typing.List[int]) -> Tuple[typing.List[int], typing.List[int], int, int]:
    left_len = int(len(numbers) / 2)
    right_len = int(len(numbers)) - left_len
    print(f'splitting {left_len}/{right_len}')
    return (
        numbers[0: left_len],
        numbers[left_len:],
        left_len,
        right_len,
    )


@task
def merge(sorted_list1: typing.List[int], sorted_list2: typing.List[int]) -> typing.List[int]:

    result = []
    left_len = len(sorted_list1)
    right_len = len(sorted_list2)
    print(f'merging {left_len}/{right_len}')

    while len(sorted_list1) > 0 and len(sorted_list2) > 0:
        # Check if current element of first array is smaller than current element of second array. If yes,
        # store first array element and increment first array index. Otherwise do same with second array
        if sorted_list1[0] < sorted_list2[0]:
            result.append(sorted_list1.pop(0))
        else:
            result.append(sorted_list2.pop(0))

    result.extend(sorted_list1)
    result.extend(sorted_list2)

    return result


@task
def debug_print(desc: str, args: dict[str, typing.Union[int, str]]) -> int:
    print(f'{desc}:')
    print(json.dumps(args, indent=2))
    return 42


@task
def sort_locally(numbers: typing.List[int]) -> typing.List[int]:
    print(f'sorting locally {len(numbers)}')
    # if len(numbers) > 5:
    #     raise RuntimeError('ahhhh too large')
    return sorted(numbers)


# debug_print(desc='merge_sort_remotely args', args={'numbers_count': len(numbers), 'run_local': run_local_at_count})

@dynamic
def merge_sort_remotely(numbers: typing.List[int], run_local_at_count: int) -> typing.List[int]:
    split1, split2, new_count1, new_count2 = split(numbers=numbers)
    sorted1 = merge_sort(numbers=split1, numbers_count=new_count1, run_local_at_count=run_local_at_count)
    sorted2 = merge_sort(numbers=split2, numbers_count=new_count2, run_local_at_count=run_local_at_count)
    return merge(sorted_list1=sorted1, sorted_list2=sorted2)


@workflow
def merge_sort(numbers: typing.List[int], numbers_count: int, run_local_at_count: int = 5) -> typing.List[int]:
    return (
        conditional("terminal_case")
        .if_(numbers_count <= run_local_at_count)
        .then(sort_locally(numbers=numbers))
        .else_()
        .then(merge_sort_remotely(numbers=numbers, run_local_at_count=run_local_at_count))
    )


def generate_inputs(numbers_count: int) -> typing.List[int]:
    generated_list = []
    # generate random numbers between 0-1
    for _ in range(numbers_count):
        value = int(random() * 10000)
        generated_list.append(value)

    return generated_list


if __name__ == "__main__":
    count = 20
    x = generate_inputs(count)
    print(x)
    print(f"Running Merge Sort Locally...{merge_sort(numbers=x, numbers_count=count)}")
I added a "print" in the sort-locally - and I can see "sorting locally 10..."
s

Samhita Alla

11/27/2023, 4:50 PM
count
is still 20, right?
g

Guy Arad

11/27/2023, 6:22 PM
what do you mean? at which point?
btw, adding an exception to `sort_locally`:
Copy code
@task
def sort_locally(numbers: typing.List[int]) -> typing.List[int]:
    print(f'sorting locally {len(numbers)}')
    if len(numbers) > 5:
        raise RuntimeError('ahhhh too large')
    return sorted(numbers)
and submitting to Flyte sandbox works - so I'm assuming this bug doesn't happen in the deployed service. Maybe it's related to running the workflow locally (which I believe is totally different)
s

Samhita Alla

11/28/2023, 7:22 AM
it works as expected for me. what have you set
numbers
to?
g

Guy Arad

11/28/2023, 3:54 PM
@Samhita Alla 5 of course 🙂 I'm running the above code as-is (
python merge_sort.py
). I created a brand new environment for python 3.11 and 3.10 and installed
flytekit
package only. This is the output:
Copy code
╰─$ python merge_sort.py 
[3033, 6634, 6119, 4576, 5913, 6872, 2058, 9155, 1801, 9976, 6987, 2455, 6965, 4731, 715, 1633, 2342, 2882, 7712, 2433]
splitting 10/10
sorting locally 10
splitting 5/5
sorting locally 5
sorting locally 5
merging 5/5
sorting locally 10
splitting 5/5
sorting locally 5
sorting locally 5
merging 5/5
merging 10/10
Running Merge Sort Locally...[715, 1633, 1801, 2058, 2342, 2433, 2455, 2882, 3033, 4576, 4731, 5913, 6119, 6634, 6872, 6965, 6987, 7712, 9155, 9976]
As you can see, I can see "sorting locally 10" lines. I was suddenly thinking it might be related to the M2 Pro chipset I'm using, so I tried running inside a docker for
linux/amd64
platform but got a similar result. I installed the latest
flytekit
(1.10.1).
s

Samhita Alla

11/28/2023, 4:51 PM
would you mind sharing with me the pyflyte run command?
g

Guy Arad

11/28/2023, 7:01 PM
I wasn't running
pyflyte
. On the sandbox cluster it works fine. I was just running it locally using
python merge_sort.py
s

Samhita Alla

11/29/2023, 7:13 AM
oh, in that case you need to modify
Copy code
if __name__ == "__main__":
    count = 20
    x = generate_inputs(count)
    print(x)
    print(f"Running Merge Sort Locally...{merge_sort(numbers=x, numbers_count=count)}")
to
Copy code
if __name__ == "__main__":
    count = 5
    x = generate_inputs(count)
    print(x)
    print(f"Running Merge Sort Locally...{merge_sort(numbers=x, numbers_count=count)}")
g

Guy Arad

11/30/2023, 11:40 AM
@Samhita Alla created an issue as discussed
3 Views