Guy Arad
11/26/2023, 11:58 AMrun_at_local_count
to 5, and adding print(f'sorting locally {len(numbers)}')
to the sort_locally
function. However, I still see it's being executed on a list of length 10! Maybe something is wrong with the condition in a local execution? how can I debug this?
2. The UI doesn't show all the levels. It gives me the "Maximum call stack size exceeded" error. I couldn't find this string in the repositories to understand where it is coming from.
3. Using dynamic
here is interesting. The issue is you need to stop the recursive compilation of the workflow in register time, and make sure it's only happening at runtime. It's worth noting in the tutorial. Essentially, when doing the merge-sort, the dynamic flow is compiled when we go down the recursion 1 level at a time.Samhita Alla
count
variable to 5?
2. seems like a UI issue: cc @Eduardo Apolinario (eapolinario)
3. dynamic workflows are always compiled at run timeGuy Arad
11/27/2023, 10:58 AMGuy Arad
11/27/2023, 12:04 PMSamhita Alla
Guy Arad
11/27/2023, 4:16 PMimport json
import typing
from datetime import datetime
from random import random, seed
from typing import Tuple
from flytekit import conditional, dynamic, task, workflow
# seed random number generator
seed(datetime.now().microsecond)
@task
def split(numbers: typing.List[int]) -> Tuple[typing.List[int], typing.List[int], int, int]:
left_len = int(len(numbers) / 2)
right_len = int(len(numbers)) - left_len
print(f'splitting {left_len}/{right_len}')
return (
numbers[0: left_len],
numbers[left_len:],
left_len,
right_len,
)
@task
def merge(sorted_list1: typing.List[int], sorted_list2: typing.List[int]) -> typing.List[int]:
result = []
left_len = len(sorted_list1)
right_len = len(sorted_list2)
print(f'merging {left_len}/{right_len}')
while len(sorted_list1) > 0 and len(sorted_list2) > 0:
# Check if current element of first array is smaller than current element of second array. If yes,
# store first array element and increment first array index. Otherwise do same with second array
if sorted_list1[0] < sorted_list2[0]:
result.append(sorted_list1.pop(0))
else:
result.append(sorted_list2.pop(0))
result.extend(sorted_list1)
result.extend(sorted_list2)
return result
@task
def debug_print(desc: str, args: dict[str, typing.Union[int, str]]) -> int:
print(f'{desc}:')
print(json.dumps(args, indent=2))
return 42
@task
def sort_locally(numbers: typing.List[int]) -> typing.List[int]:
print(f'sorting locally {len(numbers)}')
# if len(numbers) > 5:
# raise RuntimeError('ahhhh too large')
return sorted(numbers)
# debug_print(desc='merge_sort_remotely args', args={'numbers_count': len(numbers), 'run_local': run_local_at_count})
@dynamic
def merge_sort_remotely(numbers: typing.List[int], run_local_at_count: int) -> typing.List[int]:
split1, split2, new_count1, new_count2 = split(numbers=numbers)
sorted1 = merge_sort(numbers=split1, numbers_count=new_count1, run_local_at_count=run_local_at_count)
sorted2 = merge_sort(numbers=split2, numbers_count=new_count2, run_local_at_count=run_local_at_count)
return merge(sorted_list1=sorted1, sorted_list2=sorted2)
@workflow
def merge_sort(numbers: typing.List[int], numbers_count: int, run_local_at_count: int = 5) -> typing.List[int]:
return (
conditional("terminal_case")
.if_(numbers_count <= run_local_at_count)
.then(sort_locally(numbers=numbers))
.else_()
.then(merge_sort_remotely(numbers=numbers, run_local_at_count=run_local_at_count))
)
def generate_inputs(numbers_count: int) -> typing.List[int]:
generated_list = []
# generate random numbers between 0-1
for _ in range(numbers_count):
value = int(random() * 10000)
generated_list.append(value)
return generated_list
if __name__ == "__main__":
count = 20
x = generate_inputs(count)
print(x)
print(f"Running Merge Sort Locally...{merge_sort(numbers=x, numbers_count=count)}")
Guy Arad
11/27/2023, 4:17 PMSamhita Alla
count
is still 20, right?Guy Arad
11/27/2023, 6:22 PMGuy Arad
11/27/2023, 6:24 PM@task
def sort_locally(numbers: typing.List[int]) -> typing.List[int]:
print(f'sorting locally {len(numbers)}')
if len(numbers) > 5:
raise RuntimeError('ahhhh too large')
return sorted(numbers)
and submitting to Flyte sandbox works - so I'm assuming this bug doesn't happen in the deployed service.
Maybe it's related to running the workflow locally (which I believe is totally different)Samhita Alla
numbers
to?Guy Arad
11/28/2023, 3:54 PMpython merge_sort.py
).
I created a brand new environment for python 3.11 and 3.10 and installed flytekit
package only. This is the output:
╰─$ python merge_sort.py
[3033, 6634, 6119, 4576, 5913, 6872, 2058, 9155, 1801, 9976, 6987, 2455, 6965, 4731, 715, 1633, 2342, 2882, 7712, 2433]
splitting 10/10
sorting locally 10
splitting 5/5
sorting locally 5
sorting locally 5
merging 5/5
sorting locally 10
splitting 5/5
sorting locally 5
sorting locally 5
merging 5/5
merging 10/10
Running Merge Sort Locally...[715, 1633, 1801, 2058, 2342, 2433, 2455, 2882, 3033, 4576, 4731, 5913, 6119, 6634, 6872, 6965, 6987, 7712, 9155, 9976]
As you can see, I can see "sorting locally 10" lines. I was suddenly thinking it might be related to the M2 Pro chipset I'm using, so I tried running inside a docker for linux/amd64
platform but got a similar result. I installed the latest flytekit
(1.10.1).Samhita Alla
Guy Arad
11/28/2023, 7:01 PMpyflyte
. On the sandbox cluster it works fine. I was just running it locally using python merge_sort.py
Samhita Alla
if __name__ == "__main__":
count = 20
x = generate_inputs(count)
print(x)
print(f"Running Merge Sort Locally...{merge_sort(numbers=x, numbers_count=count)}")
to
if __name__ == "__main__":
count = 5
x = generate_inputs(count)
print(x)
print(f"Running Merge Sort Locally...{merge_sort(numbers=x, numbers_count=count)}")
Guy Arad
11/30/2023, 11:40 AM