WEBVTT 00:00.000 --> 00:13.400 Excellent. Thank you very much. So now we can start with the talk. Rafael is going to talk 00:13.400 --> 00:21.000 about effortless distributed computing and Python. So take it away. 00:21.000 --> 00:29.000 Can you hear me well? Fine. Okay. So my name is Rafael. I'm a software engineer and 00:29.000 --> 00:37.000 for the past few months I contributed to a few new libraries for client. And I would like 00:37.000 --> 00:43.000 to showcase them. So all the libraries are related to distributed computing. 00:43.000 --> 00:49.000 So I would present the libraries the first one. It's a we named it scalar. So it's a 00:49.000 --> 00:55.000 kind of distributed system for Python. So you can send Python task and it will 00:55.000 --> 01:01.000 put them to some workers, different computers, do the computation and return to result. 01:01.000 --> 01:05.000 On top of that we made a profit. So it's a very small library that you can use to 01:05.000 --> 01:10.000 do map produce. So I will explain what map you use is. And the last one is 01:10.000 --> 01:17.000 paragraph. It's another library that we developed to do distributed graph 01:17.000 --> 01:24.000 computation. So graph is when you have dependencies between task. So we start 01:24.000 --> 01:29.000 with scalar. This is really the core of what we did. It looks a bit like task for 01:29.000 --> 01:38.000 those we know, you know. So since Python 3.2 you can use the process 01:38.000 --> 01:45.000 put executor to submit task to different processes, Python processes on your 01:45.000 --> 01:50.000 machine to do some kind of parallel computing. So this is an example. So you start a 01:51.000 --> 01:57.000 four workers in this case. I'm not going to use all of them. And you submit two 01:57.000 --> 02:03.000 functions to square root with different parameters. And the executor will not 02:03.000 --> 02:07.000 return the result immediately with a future. So a future is just an object 02:07.000 --> 02:13.000 that will add some point content the result. And then you can just print 02:13.000 --> 02:19.000 and use the result. But Python will block if the result is not ready yet. So 02:19.000 --> 02:25.000 if the computation that happened on another process didn't finish yet. So 02:25.000 --> 02:31.000 it's an easy way to do a parallelization in Python. And what we wanted to do is 02:31.000 --> 02:37.000 to not only be able to send processes to different cores, but also to different 02:37.000 --> 02:43.000 machines. So we created an infrastructure. So instead of using the process 02:43.000 --> 02:50.000 tool, you use our client that connects to a cluster. And you can submit task 02:50.000 --> 02:54.000 the same way. So it's a total transparent. It will serialize the function, 02:54.000 --> 02:59.000 the argument, and then fetch back the result. It's similar to task. I can 02:59.000 --> 03:04.000 talk about the difference after the talk if some are interested. So the 03:04.000 --> 03:08.000 architecture is like this. You've got the scheduler, which is the central point 03:08.000 --> 03:13.000 of our system. And then you can have as many workers as you want that are 03:13.000 --> 03:19.000 on different machines. And the scheduler is responsible for accepting functions 03:19.000 --> 03:23.000 and tasks from the client and redirecting them to the worker that had 03:23.000 --> 03:30.000 resources available. So we'd like to show a small example. So we are going 03:30.000 --> 03:36.000 to make a parallel implementation of the Monte Carlo approximation of Python. 03:36.000 --> 03:41.000 So the approximation of Python using Monte Carlo is basically you take a 03:41.000 --> 03:46.000 square and you put random points. You generate random points on that square. 03:46.000 --> 03:51.000 And you count the points that fill that four inside the circle that is in 03:51.000 --> 03:57.000 this square. And on the example on the left, if you count the points that 03:57.000 --> 04:01.000 they are in the green inside the circle, you divide that by the ratio 04:01.000 --> 04:05.000 of points. You multiply that by four. You will get an approximation of 04:05.000 --> 04:10.000 five. So three to two is pretty close. Points you had the closer to five 04:10.000 --> 04:15.000 you get. So this is the example I'm going to use in the next slide. So if you 04:15.000 --> 04:20.000 want to do that in Python, first you have to define what is when a point is 04:20.000 --> 04:24.000 inside the circle. So you take the trigger metric equation for the circle. 04:24.000 --> 04:28.000 You check the point is inside the circle. It doesn't matter much. It's just 04:28.000 --> 04:32.000 for example. And then you do the Monte Carlo estimation. So you generate 04:32.000 --> 04:39.000 random coordinates. So x is and y's. And you filter to the points that are 04:39.000 --> 04:44.000 inside the circle. And you count them. You multiply that by four and divide it 04:44.000 --> 04:49.000 by the number of points. And you get an estimate of five. So the more points you had. 04:49.000 --> 04:53.000 So this is the function run. The more points you had, the closer to five you 04:53.000 --> 05:00.000 get. And this is a sequential version. And if you run it with one hundred 05:00.000 --> 05:05.000 thousand points, it's really quite slow. Maybe a few seconds. But it gets closer to five. 05:05.000 --> 05:11.000 So what if we would like to scale that further. And first do it on multiple 05:11.000 --> 05:19.000 course and then on multiple machines. So we're going to make another function 05:19.000 --> 05:24.000 that we'll use the first one. And the function what it will do is that it will 05:24.000 --> 05:30.000 split the number of points that you want to generate. By it is case one hundred. 05:30.000 --> 05:36.000 And for each of this subset of points, it will start the task. So this is here, 05:36.000 --> 05:43.000 the submit. It will start 100 tasks that each have one hundred of the number of 05:43.000 --> 05:52.000 points. Then you average the result. So this sum is during the average. So all this is 05:52.000 --> 05:57.000 compared to the sequential one. So the sequential one, if you want to do it on one 05:57.000 --> 06:03.000 billion points, it will take about seven minutes. So it's quite slow. If you use the 06:03.000 --> 06:10.000 byte and built in executor with eight course. So this is this laptop. You will get it in 06:10.000 --> 06:17.000 five or five minutes. So it will take about fifty seven seconds. So it's already way faster. 06:17.000 --> 06:22.000 And if you use all scalar client. This is on a big cluster. We have one 06:22.000 --> 06:26.000 thirty five course on that one. You see that the computation is twelve 06:26.000 --> 06:31.000 seconds. So it's thirty five times faster than the other one. So this is 06:31.000 --> 06:37.000 basically all you can do prior computation with scalar. We have additional 06:37.000 --> 06:43.000 one. This one is equivalent to edge top. So you can see those status of the server. 06:43.000 --> 06:48.000 All the course, you know, how much CPU is it to have. You know, how many 06:48.000 --> 06:53.000 clients are connected to the cluster. And you also have the status of the 06:53.000 --> 06:59.000 skidder, which is read the sunflower point of or infer. We have 06:59.000 --> 07:05.000 feeder recovery, which means that if a work dies, why it is computing a task, 07:05.000 --> 07:09.000 the skidder will immediately reboot all the tasks that the worker was 07:09.000 --> 07:15.000 responsible for to available workers. We have also dynamic load 07:15.000 --> 07:20.000 balancing. Like if one worker has too many tasks and just cannot 07:20.000 --> 07:24.000 make progress that might happen if some of your tasks are way longer to compute 07:24.000 --> 07:28.000 than others. It will reboot all the tasks that the worker 07:28.000 --> 07:35.000 accumulated on available workers. So that's for skidder. And on top of that 07:35.000 --> 07:40.000 we made a library that you can use to do map reduce. So I'm going to 07:40.000 --> 07:48.000 explain what sorry map reduce is. So another example, you want to count 07:48.000 --> 07:53.000 the words that you have inside the text. So you get this count words 07:53.000 --> 07:58.000 function. So it accepts a list of lines. So it's every sentence is 07:58.000 --> 08:04.000 an item in the list. And you just go through all the lines and all the 08:04.000 --> 08:08.000 words and you just accumulate and count the words. And then when you 08:08.000 --> 08:13.000 run it on a small file, you will get obviously that the most common words 08:13.000 --> 08:21.000 are d and then off. But what if we want to do this on a very large 08:21.000 --> 08:26.000 data set? Like something that is gigabytes. And what if we want the result 08:26.000 --> 08:31.000 to be fast? You can use the map reduce pattern. So map reduce is 08:31.000 --> 08:36.000 basically you take the inputs. You split the inputs in different batches 08:36.000 --> 08:41.000 or partitions. You process all these partitions and batches 08:41.000 --> 08:46.000 separately and then you have a function that combines the result. So 08:46.000 --> 08:51.000 paraphernal just helps you doing that. So it's a decorator. So you 08:51.000 --> 08:54.000 take the original function. You don't change anything to the function that 08:54.000 --> 08:58.000 you have. You just say that okay, I've got this function. I want to 08:58.000 --> 09:05.000 paralyze it. I want to check the data by batches. So in this case, 09:05.000 --> 09:09.000 we have predefined function for list. So the first argument of the 09:09.000 --> 09:15.000 decorator says I want the input data to be split it in whatever 09:15.000 --> 09:21.000 the size is, but in batches of lines. And then I want the 09:21.000 --> 09:24.000 executive function and finally I will combine the result with the 09:24.000 --> 09:28.000 sum. So you can sum contours and you will get the little 09:28.000 --> 09:34.000 option. So by doing this, you can call the function to take 09:34.000 --> 09:40.000 transparently like this one with a way larger file. And actually 09:40.000 --> 09:44.000 it will run on multiple computers if you're using scalar 09:44.000 --> 09:47.000 file. And something that is very important with 09:47.000 --> 09:52.000 Parthen is that you don't specify the size of the batches. 09:52.000 --> 09:57.000 You don't specify all you're going to split the, I mean, 09:57.000 --> 10:01.000 you're not going to specify the size of the small task. So you 10:01.000 --> 10:05.000 don't know how many tasks will be created. Because we, 10:05.000 --> 10:10.000 finding the optimal batch size for computation is quite hard. 10:10.000 --> 10:14.000 Like, if it is too small, so if you create too many tasks, 10:14.000 --> 10:19.000 you will spend a lot of time just trying to transfer 10:19.000 --> 10:23.000 data communicating and doing some synchronization stuff, 10:23.000 --> 10:27.000 maybe IPC, you're running on single machine. And if you 10:27.000 --> 10:31.000 sub-task or too large, like if you split the data in two 10:31.000 --> 10:34.000 large chunks, you will not get that much 10:34.000 --> 10:38.000 Parization. So you want something that is smart about that. 10:38.000 --> 10:41.000 So we use machine learning. So the decorator won't use it. 10:41.000 --> 10:45.000 This is the debugging information. It will automatically 10:45.000 --> 10:50.000 automatically deduce what is the optimal size of the 10:50.000 --> 10:55.000 partitions of the batch. So in this case, it deduced 10:55.000 --> 11:01.000 some tries that computing the function in batches of 11:01.000 --> 11:08.000 1,660,600 lines. It's actually quite efficient. 11:08.000 --> 11:11.000 By doing that, you will only spend 5% of the time doing 11:11.000 --> 11:16.000 communication and IPCs. And you get a nice speed up. 11:16.000 --> 11:20.000 So this is the third transparent. And that's probably the main feature of 11:20.000 --> 11:24.000 our map with use thing. 11:26.000 --> 11:30.000 Lastly, we have paragraph. So it's, again, another library 11:30.000 --> 11:34.000 that you can use on top of scatter. And this one is really 11:34.000 --> 11:40.000 designed to do graph computation. I've got a third example. 11:40.000 --> 11:47.000 So you want to generate a report. And your report, it opens 11:47.000 --> 11:51.000 a file. It reads the file. It pre-process the files. 11:51.000 --> 11:56.000 But it also goes to a database. And it will extract 11:56.000 --> 12:01.000 the email, the email addresses. And then finally, it will send 12:01.000 --> 12:06.000 the report to all these email addresses. So if you are into 12:06.000 --> 12:10.000 distributed computing, you will notice that actually you can 12:10.000 --> 12:15.000 run this to a set of code, a book of code, 12:15.000 --> 12:19.000 super key, and in a personalized way. Because they're not 12:19.000 --> 12:25.000 dependent. They're only dependent here. So what we have is 12:25.000 --> 12:33.000 two additional decorators that allows you to define graphs in a 12:33.000 --> 12:40.000 declarative way in Python. So first we have the 12:40.000 --> 12:45.000 delayed one, the delay one, you will define what is your 12:45.000 --> 12:50.000 graph leaves. So what the smallest function that you don't 12:50.000 --> 12:54.000 want to utilize. And we just had the delay to all the 12:54.000 --> 12:59.000 sub-task. So remember, we had this read data file, read 12:59.000 --> 13:03.000 progress table, extract emails, all these functions, all 13:03.000 --> 13:09.000 the ones that are in green and orange. You just said that 13:09.000 --> 13:13.000 this is a release of your graph. And then the parent 13:13.000 --> 13:15.000 function, the one that is coding the other function, you 13:15.000 --> 13:21.000 said that it is a graph. And by doing that, when you 13:21.000 --> 13:24.000 request the graph representation of this 13:24.000 --> 13:28.000 computation, it will be actually not really run the 13:28.000 --> 13:31.000 function, so it will inspect the function. And it will be 13:31.000 --> 13:34.000 able to generate a graph that will present the 13:34.000 --> 13:38.000 computation. And here we see that we have two paths that we can 13:38.000 --> 13:42.000 take to do the computation in a distributed way. And once 13:42.000 --> 13:47.000 you have this graph, you can export it and run it on 13:47.000 --> 13:51.000 scale. So we have a special function. You can also 13:51.000 --> 13:55.000 run it on that. It is the same format. And that is 13:55.000 --> 13:59.000 basically the ID. I am a little bit 13:59.000 --> 14:04.000 early. So I have a lot of time for questions. 14:04.000 --> 14:06.000 Thank you. 14:06.000 --> 14:10.000 Thank you so much. 14:10.000 --> 14:15.000 Just before the question, we released that a few 14:15.000 --> 14:22.000 months ago on the Apache 2. And we welcome 14:22.000 --> 14:26.000 contributors and issues and if you have just 14:26.000 --> 14:30.000 it is on GitHub. 14:30.000 --> 14:33.000 Thanks for the presentation. For the 14:33.000 --> 14:36.000 graph representation, can you represent more 14:36.000 --> 14:39.000 advanced graph structures including what? Can you 14:39.000 --> 14:42.000 represent more complicated graph structures? 14:43.000 --> 14:46.000 You are outta 14:46.000 --> 14:49.000 fail Ju. 14:49.000 --> 15:01.000 We had a couple of questions and questions. 15:01.000 --> 15:08.000 In comparison to the concepts, in fact, various simple 15:08.000 --> 15:11.000 structures and equations. So this one is a very simple one. 15:11.000 --> 15:17.200 what how many levels he wants, what the graph library is doing is that it's 15:17.200 --> 15:22.520 it's overriding all the Python operators. So when you run the function it actually 15:22.520 --> 15:27.400 doesn't run the function, it's just generate a kind of syntax tree and 15:27.400 --> 15:34.080 we extract the graph from that. So unless the operator is not correctly 15:34.080 --> 15:39.200 designed it should work and the format for which we generate the graph is the same 15:39.200 --> 15:44.440 as desk which is part of the library so you can run it on desk and on 15:44.440 --> 15:51.240 skater, what was it the second one, what is it the second one, 15:51.240 --> 16:02.960 the format it's pretty standard, it's a Python dictionary, I'm just going to read 16:02.960 --> 16:08.800 one question that was written in the chat so it says with full scalar, how does 16:08.800 --> 16:12.520 it include, how does it handle Python dependencies say if you need a 16:12.520 --> 16:16.040 pipeline package installed on your for your task do you need to make sure 16:16.040 --> 16:22.040 that it's on your notes in advance? Yeah so we are using CloudPycle so it's 16:22.040 --> 16:26.400 basically the same constraints as CloudPycles so you you might want to have the 16:26.400 --> 16:32.680 libraries installed, however you can override the serializer so if you want to 16:32.680 --> 16:36.760 do something more complex you could technically do it but yeah that's one of the 16:36.760 --> 16:40.720 complexities you have to to install the libraries on all the machines 16:40.720 --> 16:46.640 daily, hello thank you for your for the talk, what is the relation between 16:46.640 --> 16:56.720 scalar and something like salary? I never used it but it's mostly a queue, I don't 16:56.720 --> 17:01.720 think it it uses the executor interface but there might be a wrong I never used it 17:01.720 --> 17:09.760 yeah sorry I don't know much about sorry so I can I can't read 17:09.760 --> 17:16.840 answer hi yeah thanks for the interesting quotation I was wondering about 17:16.840 --> 17:21.120 testing do you have a support like like if I had these decorators can I 17:21.120 --> 17:28.720 disable them you're in testing or how do I yeah yeah for the graph actually so if 17:28.720 --> 17:34.240 you don't call the two graph it just looks like a Python function so you can 17:34.240 --> 17:42.360 just run it sequentially and for powerful we actually renamed the original 17:42.360 --> 17:47.080 function so you just had two underscore sequential and you get access to the original 17:47.080 --> 17:52.200 function you can also disable it on the system wide system by coding some 17:52.200 --> 17:58.520 function and the second question if I may with par from there's no explicit 17:58.520 --> 18:04.200 connection to to scalar is it like something under it? Yeah it's okay you have to set 18:04.200 --> 18:10.280 up this in the you can either create a context you know the width something 18:10.280 --> 18:17.840 in Python or you set up the backends on the on the world process that's the 18:17.840 --> 18:21.720 two way we we under that so you can define the function and use different 18:21.720 --> 18:24.840 backends and call the function using different backends at the same time 18:24.840 --> 18:35.840 yeah thanks for the presentation I have two questions one I'll do you 18:35.840 --> 18:41.880 set up the cluster I think you mentioned cloud python is that now it's just 18:41.880 --> 18:54.600 we give our processes so we just have we get back to do so there's two things 18:54.600 --> 18:58.840 are just Python programs so you just start the programs and they all connect to 18:58.840 --> 19:06.120 some IP address or whatever you want your configuration it's just Python 19:06.120 --> 19:12.120 commands so all the configuration is in the parameters so in the arguments 19:12.120 --> 19:17.280 okay thanks and on the fillers scenario you you saw out to recover from a 19:17.280 --> 19:24.600 worker failing what if this scheduler is that is yeah you don't yeah we 19:24.600 --> 19:34.080 we we I have a question in terms of the worst case scenario do you mind going 19:34.080 --> 19:41.800 to a slide where like one of the cluster nodes dies yeah so I'm wondering 19:41.800 --> 19:45.920 just because of the syntax I don't this it takes into the pi example like you 19:45.920 --> 19:49.720 run the scheduler runs all the jobs and then at the end it's some tip together 19:49.720 --> 19:54.320 so you can get like the result of pi and a worst case scenario let's say the 19:54.320 --> 19:59.280 cluster dies or all the nodes die like one of the jobs never completes like 19:59.280 --> 20:02.240 what happens to that computation what happens to that syntax where you're 20:02.240 --> 20:06.920 adding stuff and you end up adding like a none let's say or you mean if you're 20:06.920 --> 20:10.960 process so you're running a process they're so long that let's say like 20:10.960 --> 20:15.080 one of the nodes just dies and you never get a response and you're adding 20:15.080 --> 20:18.880 over multiple processes like does it return a none and then it tries to add a 20:18.880 --> 20:23.080 none and then you get like an air Python I'm not sure I understand but the 20:23.080 --> 20:26.440 workers are constantly connecting to the scheduler so if she lose the 20:26.440 --> 20:31.520 connection it's a kind of thing so if the latencies became too big like one 20:31.520 --> 20:36.960 minute the worker is we'll be considered as dead all the task will be 20:36.960 --> 20:41.440 redirected yeah but if all the if the workers never able to answer what does 20:41.440 --> 20:47.080 the scheduler return to that like if it never gets an answer for a job oh but if 20:47.080 --> 20:52.240 this one dies the scheduler keeps the task on it yeah it's 20:52.240 --> 20:57.160 Department we have a parameter for that so if you don't store the task on the 20:57.160 --> 21:05.000 scheduler obviously it will answer an exception to the client but by default we 21:05.000 --> 21:09.440 store the task object so the arguments and the function code 21:09.440 --> 21:19.320 basically and we can reboot the task thank you I had another question for the 21:19.320 --> 21:24.400 graph execution one can the graph be modified why it's running or you have to 21:24.400 --> 21:30.760 compile it beforehand and then it could technically modify it yourself like 21:30.760 --> 21:40.600 as I said it built it built a kind of abstract tree of the computation as a 21:40.600 --> 21:44.480 Python dictionary so you could technically access it and modify it but the library 21:44.480 --> 22:04.080 doesn't do it how do the how does the system handle data exchange between 22:04.080 --> 22:09.480 workers like a result of a function is being in the graph is returned and is 22:09.480 --> 22:17.160 needed by another function I mean for the graph computation yeah yeah so it will not 22:17.160 --> 22:23.360 send the task unless these are needed so we have it's ended on the scheduler 22:23.360 --> 22:29.280 side on on the client side so we're not queueing the task to the workers until we 22:29.280 --> 22:33.640 have the argument for the client weights for the data to come back and it's 22:33.640 --> 22:39.600 like okay now next task that schedules will get that input I'm not sure 22:39.600 --> 22:45.360 I understand well if you have like a function and it there are dependencies 22:45.360 --> 22:50.080 between them right data dependencies so one function as an input take the output of 22:50.080 --> 22:54.640 another function yeah like how does that data flow from one oh and it might be 22:54.640 --> 22:59.080 scheduled and it doesn't get back to the client unless you request it okay so 22:59.080 --> 23:04.920 it's the scheduler it will so let's say that the work on top wants to execute 23:04.920 --> 23:09.160 a function that executed there so we get the data back to the scheduler and then 23:09.160 --> 23:19.160 it gets back to the the other worker that needs to input just a quick one 23:19.160 --> 23:23.160 maybe I got distracted and haven't got that part the batch size that you 23:23.160 --> 23:26.760 mentioned is it's something that you need to make the book keeping off the size 23:26.760 --> 23:30.600 of it or is it somewhere automatically in the background calculated it's calculated 23:30.600 --> 23:35.400 so it's calculated yeah it'll give any eye no it's basically machine running 23:35.400 --> 23:40.920 it's a regression so it looks at the function when you execute it and it tries 23:40.920 --> 23:45.800 different batch size and after a few tries it will deduce that the optimal 23:45.800 --> 23:51.840 one is whatever it is and it will continue to learn so if you change the parameters 23:51.840 --> 23:56.240 for example and suddenly the function doesn't behave the same it 23:56.240 --> 24:06.320 starts to be learned okay thank you so I think that's it in terms of 24:06.320 --> 24:12.000 questions thank you very much ah that's one more sorry I've got 58 seconds this is 24:12.000 --> 24:18.440 six one seven so coming coming from communities it looks similar design the big 24:18.440 --> 24:24.680 problem communities are CPU limit memory limit even I owe limit so how 24:24.680 --> 24:30.760 do you deal with that so we've got a features I'm working on it it will be 24:30.760 --> 24:36.200 released next week but we have what we call tags so if we work on hand can have 24:36.200 --> 24:45.160 specific tags so it's just symbols strings and you can say OT1 as a lot of memory 24:45.160 --> 24:50.280 and and then when you submit the task it will only be redirected to the 24:50.280 --> 24:58.440 work your that F this specific tag so yeah but my kind of handle that and if you 24:58.440 --> 25:03.560 want to limit the CPU usage inside the process you have to run in C group so 25:03.560 --> 25:10.920 things like that you have to deal with that on the operating system level okay so thank you very 25:10.920 --> 25:15.000 thank you very much