.. _pipelines_tutorial: # How to write basic pipelines :ref:`pipelines` explains in more detail what pipelines are, whereas this page will look at some practical examples. ## Before we start This tutorial is aimed at people who have used RiskScape models a lot, and now want to start creating more customized pipelines of their own. We expect that you: - Have been through the :ref:`expression_tutorial` tutorial already. - Are comfortable writing basic RiskScape expressions. - Are comfortable using RiskScape commands from the terminal prompt. The aim of this tutorial is to give you enough knowledge to take a pipeline generated by the wizard and start adding extra pipeline steps of your own. .. tip:: To learn more about creating bookmarks, writing functions, or defining types, please refer to the :ref:`project-tutorial` tutorial. Functions and bookmarks are separate concepts that apply regardless of whether you build your model in the wizard or write a pipeline from scratch. ## Setup Click [here](../pipeline-basics.zip) to download the input data we will use in these examples. Unzip the file into a new directory, e.g. `C:\RiskScape_Projects\`. Open a command prompt and `cd` to the directory where you unzipped the files, e.g. ```none cd C:\RiskScape_Projects\pipeline-basics ``` You will use this command prompt to run the RiskScape commands in this tutorial. The unzipped directory contains an empty file called `pipeline.txt`. Open this file in a text editor, such as Notepad or `gedit`. This is where you will paste the pipeline examples. The `project.ini` is pretty simple - it loads the `pipeline.txt` contents as a pipeline model called `tutorial`. Open the `buildings.csv` file and familiarize yourself with the data. This is the building input data we will use in our pipeline. .. note:: This tutorial will focus on general pipeline concepts that make up part of a RiskScape model. For simplicity, the buildings do not have any geospatial data associated with them. We will focus on simply transforming the input data, rather than constructing an actual risk model from scratch. ## The basics A RiskScape pipeline is simply a series of data processing *steps*. Each step in the pipeline has one particular job to do, which is reflected in the step's name. The output data from one pipeline step becomes the input data for the next pipeline step. By *chaining* steps together, we can achieve complex data processing tasks, such as a risk analysis model. For example, previous tutorials have looked at how you can use aggregation to collate or *group* your results. .. image:: ../diagrams/aggregation.png :alt: Four rows of building data are aggregated down to two rows, by grouping on the building construction type. This aggregation operation is actually achieved by a `group` pipeline step, which we will learn more about soon. The aggregation operation transforms the rows of input data, or *tuples*. In this case, the output data from the pipeline step looks quite different to the step's input data. ## Input data Let's start off with the simplest possible case: loading your data into RiskScape. Every pipeline starts off with an `input` step that specifies the bookmark data to use. Add the following to your `pipeline.txt` file and save it. ```none input('buildings.csv') ``` Now try running the pipeline using the following command. ```none riskscape model run tutorial ``` The command should produce an output file called `input.csv` in a time-stamped `output/tutorial/` sub-directory. .. note:: At the end of the pipeline, the pipeline data is always saved to file. By default, the filename will correspond to the name of the last pipeline step, which is ``input`` in this case. Open the `input.csv` file that was produced and check its contents. .. tip:: Entering ``more ""`` into the command prompt is a quick way to check a file's contents. You should see that the output file has the exact same content as the `buildings.csv` input file. This is because we loaded the data into the pipeline, but we did nothing with it. Don't worry, we'll start doing more complicated things with the input data soon. ### Pipeline parameters You may notice that the `input('buildings.csv')` syntax looks a lot like a function call. The bit inside the brackets are called the pipeline step _parameters_. The parameters specify how the step processes the pipeline data it receives. Each pipeline step comes with some basic documentation that will help you understand how to use it. You can read this help documentation using `riskscape pipeline step info STEP_NAME`. You can see a list of supported pipeline steps using `riskscape pipeline step list`. Try reading about the parameters that the 'input' step takes. ```none riskscape pipeline step info input ``` Similar to function arguments, the pipeline step parameters have keyword names, e.g. ```none input(relation: 'buildings.csv') ``` If a pipeline step has many optional parameters, then you will _have_ to use the parameter keywords. .. note:: You will notice that the ``relation`` parameter in the ``input`` step can be a bookmark *or* filename/URL. We are using the filename here because we haven't defined any bookmarks at all. In this case, the file path is *relative* to the directory that our ``project.ini`` file is in. ## Adding a filter step Now we will add in a second pipeline step to actually do something with the input data. Let's say we want to exclude any Outbuildings from our data. Replace the contents of your `pipeline.txt` file with the following. ```none input('buildings.csv') -> filter(Use_Cat != 'Outbuilding') ``` The `filter` step takes a `boolean` (true or false) conditional expression. When the condition evaluates to `true` for a row of data, that row will be passed onto the next pipeline step. When the condition evaluates to `false`, that row of data will be excluded from the pipeline. The `->` is used to chain two pipeline steps together. Think of it as representing the input data flowing between the two steps. Try running this pipeline using the following command: ```none riskscape model run tutorial ``` This should produce a `filter.csv` output file. Notice that the name of the output file has changed to match the last step in our pipeline. Open the `filter.csv` output file and check its contents. The pipeline has now transformed the input data so there are no longer any Outbuildings included. ## Transforming data with select We have seen that the `filter` step can remove rows of input data. Now let's try manipulating each row of input data a little. The `select` pipeline step will be the main way you transform the attributes in your data. The `select` step is a lot like the `SELECT` SQL statement (without the `WHERE` clause). It tells RiskScape which attributes (columns) to pull out of the rows of input data. Let's try a really simple select step. Replace the contents of your `pipeline.txt` file with the following. ```none input('buildings.csv') -> filter(Use_Cat != 'Outbuilding') -> select( { ID, Use_Cat, Cons_Frame as construction, REGION } ) ``` Save `pipeline.txt` and enter the following command to run your latest pipeline. ```none riskscape model run tutorial ``` This time it will produce a `select.csv` output file. Open this file and examine its contents. The select step actually did several things: - It reordered the `Use_Cat` and `Cons_Frame` columns. - It renamed the `Cons_Frame` attribute to `construction`. - It _removed_ the `storeys` and `area` attributes from the input data, because these were not specified in the select at all. You may have noticed that this time the pipeline code contains `{ }`s. This is because the select step actually takes a `Struct` expression as its parameter. .. note:: Each row of data, or tuple, in the pipeline is actually an instance of a ``Struct``. The ``select`` step is simply building the ``Struct`` that the next pipeline step will receive. ### Adding new attributes Now we will try another simple example where we create a _new_ attribute and add it to our pipeline data. Let's say we want to approximate the total floor-space in the building by taking the area of the building footprint and multiplying it by the number of storeys it has. Replace the contents of your `pipeline.txt` file with the following. Note that we have split the `Struct` declaration in the `select` step over several lines here, so that it is easier to read. ```none input('buildings.csv') -> filter(Use_Cat != 'Outbuilding') -> select({ Use_Cat, Cons_Frame as construction, REGION, storeys * area as floorspace }) ``` Now try running the pipeline model again using the following command: ```none riskscape model run tutorial ``` Uh-oh, something went wrong this time. ```none Failed to validate model for execution - Failed to validate 'select({Use_Cat, Cons_Frame as construction, REGION, storeys * area as floorspace})' step for execution - Failed to validate expression '{Use_Cat, Cons_Frame as construction, REGION, storeys * area as floorspace}' against input type {ID=>Text, Cons_Frame=>Text, Use_Cat=>Text, area=>Text, storeys=>Text, REGION=>Text} - Operator '*' is not supported for types '[Text, Text]' ``` Remember that RiskScape uses indentation to show how the errors are related - they drill-down in more and more specific detail about the actual problem. The first error tells us the problem was with the `select` step. The next line tells us there was something wrong with our `Struct` expression. Finally the last error tells us the problem was with the multiplication part of the expression - we are trying to multiply two `Text` strings. You may recall that everything in RiskScape has a :ref:`type`, and CSV data is always `Text` type. So here we need to tell RiskScape that these attributes are actually `Integer` values. Normally you would do this in your bookmark, but because we are not using bookmarks at all, we can call _casting_ functions instead to turn the `Text` strings into `Integer` and `Floating` values. Update your `pipeline.txt` file to contain the following: ```none input('buildings.csv') -> filter(Use_Cat != 'Outbuilding') -> select({ Use_Cat, Cons_Frame as construction, REGION, int(storeys) * float(area) as floorspace }) ``` Save the file and run the pipeline model again using the following command: ```none riskscape model run tutorial ``` Check that the command works now, and a new `floorspace` attribute is included in the new output file. .. tip:: Using ``int()`` and ``float()`` here are function calls. Remember that you can call a function from *any* RiskScape expression, and calling functions allows you to do much more complicated things in your pipeline. The ``select`` step is a great place to call functions, although you can call functions from any pipeline step that accepts expression parameters. ### Naming steps Each step in the pipeline has a name. So far we have been relying on RiskScape to use a default name for us. You specify the pipeline step name by adding `as YOUR_STEP_NAME` after the closing `)` bracket. Let's try naming the `select` step now. ```none input('buildings.csv') -> filter(Use_Cat != 'Outbuilding') -> select({ Use_Cat, Cons_Frame as construction, REGION, int(storeys) * float(area) as floorspace }) as calc_floorspace ``` If you run the pipeline again, you will notice that the output filename now matches the name of the last step in our pipeline. .. tip:: You don't have to worry too much about naming your pipeline steps unless you want to 'fork' a pipeline chain into two (i.e. to create multiple output files), or combine two separate pipeline chains together (i.e. to combine input data from different files). ### Select all attributes It can get a bit cumbersome in `select` steps to always specify every single attribute that you want to retain. Luckily, there is an easier way to do this when you are only adding _new_ attributes to the pipeline data. When used on its own in a `Struct` expression, the `*` operator acts as a wildcard that means _include all attributes_. Let's say we also want to convert the floor-space (in square metres) to square feet. Try running the following pipeline. ```none input('buildings.csv') -> filter(Use_Cat != 'Outbuilding') -> select({ Use_Cat, Cons_Frame as construction, REGION, int(storeys) * float(area) as floorspace }) as calc_floorspace -> select({ *, round(floorspace * 10.76) as square_footage }) ``` Check the output file that gets produced. All the attributes produced by the previous `calc_floorspace` step should still be there, as well as the new `square_footage` attribute. .. note:: We also used the ``round()`` function here to round the ``square_footage`` value to the nearest whole number. .. _pipelines_aggregation: ## Aggregation Aggregation lets you combine many rows of data into a single result, or group of results, such as the _total_ floor space across all buildings. Aggregation works a lot like a `GROUP` clause in SQL. Now let's try aggregating the buildings by their construction type to calculate some statistics about our buildings. To aggregate the rows of data into a combined result, we use the `group` step. The `select` step transforms each row of data one at a time. Whereas the `group` aggregation works across _all_ rows of data. The `group` pipeline step takes two parameters: - `by`: the attribute(s) that we are grouping _by_ (i.e. the construction type). - `select`: a `Struct` expression that specifies what aggregation to include in the output. Try running the following pipeline and check the output results. ```none input('buildings.csv') -> filter(Use_Cat != 'Outbuilding') -> select({ Use_Cat, Cons_Frame as construction, REGION, int(storeys) * float(area) as floorspace }) as calc_floorspace -> select({ *, round(floorspace * 10.76) as square_footage }) -> group(by: construction, select: { construction as "Building type", sum(square_footage) as total_square_footage, count(*) as Num_buildings }) ``` This should now produce a `group.csv` output file that looks like the following: ```none Building type,total_square_footage,Num_buildings Reinforced Concrete,85340,13 Timber,236298,320 Masonry,4574663,2211 Steel,15949,7 Reinforced_Concrete,128368,10 ``` Instead of having a row of data for each building, we now have one row of data for each building *type*. The `select` parameter looks very similar to the `select` pipeline step. However, the difference here is that the `Struct` contains *aggregation* expressions. .. tip:: Aggregation expressions always apply across *all* rows. You can think of the ``sum(square_footage)`` expression in RiskScape as like using the ``SUM()`` spreadsheet formula across the whole ``square_footage`` column. .. note:: You may have also noticed that we used double-quotes around ``"Building type"``. When an attribute identifier contains non-alphanumeric characters, such as spaces or ``-`` dashes, it needs to be enclosed in double-quotes. .. _group_multiple_attributes: ### Aggregating by multiple attributes Often you will want to aggregate your results by _multiple_ attributes, e.g. by construction-type _and_ by region. To do this, all we need to do is specify the `by` parameter as a `Struct` expression containing all the attributes we are interested in. For example, let's try aggregating by construction type _and_ the region that the building is in. Try running the following pipeline and check the results. ```none input('buildings.csv') -> filter(Use_Cat != 'Outbuilding') -> select({ Use_Cat, Cons_Frame as construction, REGION, int(storeys) * float(area) as floorspace }) as calc_floorspace -> select({ *, round(floorspace * 10.76) as square_footage }) -> group(by: { construction, REGION }, select: { REGION as Region, construction as "Building type", sum(square_footage) as total_square_footage, count(*) as Num_buildings }) ``` You should now see a summary of the buildings by region *and* building type that looks *something* like the following (the output is unsorted, so the rows may appear in a different order). ```none Region,Building type,total_square_footage,Num_buildings Aleipata Itupa i Lalo,Masonry,647375,319 Aleipata Itupa i Lalo,Reinforced_Concrete,5624,1 Aleipata Itupa i Lalo,Timber,27759,40 Aleipata Itupa i Luga,Masonry,363064,190 Aleipata Itupa i Luga,Reinforced_Concrete,9451,1 Aleipata Itupa i Luga,Timber,38973,65 Anoamaa East,Masonry,46667,38 Falealili,Masonry,1033851,528 Falealili,Reinforced Concrete,17635,3 Falealili,Reinforced_Concrete,39863,5 Falealili,Timber,52325,71 Lepa,Masonry,278470,138 Lepa,Timber,36337,84 Lotofaga,Masonry,194496,98 Lotofaga,Reinforced_Concrete,73430,3 Lotofaga,Timber,7391,9 Safata,Masonry,748685,348 Safata,Reinforced Concrete,48262,6 Safata,Timber,5674,7 Siumu,Masonry,970636,355 Siumu,Reinforced Concrete,16259,3 Siumu,Timber,59698,39 Vaa o Fonoti,Masonry,291419,197 Vaa o Fonoti,Reinforced Concrete,3184,1 ,Steel,15949,7 ,Timber,8141,5 ``` .. note:: You can also omit the ``by`` parameter completely. The aggregation will then produce a single result across all rows. This can be useful for producing a *national* total loss result. ## Saving results We have been relying on RiskScape to save our results to file for us so far. As we have seen, the output from the last step in a pipeline chain will be written to file, and by default the filename will match the name of the last pipeline step. We can explicitly specify the filename and file format to save results in by using the `save` pipeline step. Try running the following pipeline. ```none input('buildings.csv') -> filter(Use_Cat != 'Outbuilding') -> select({ Use_Cat, Cons_Frame as construction, REGION, int(storeys) * float(area) as floorspace }) as calc_floorspace -> select({ *, round(floorspace * 10.76) as square_footage }) -> group(by: { construction, REGION }, select: { REGION as Region, construction as "Building type", sum(square_footage) as total_square_footage, count(*) as Num_buildings }) -> save('aggregated-results.csv', format: 'csv') ``` Notice that RiskScape now uses the filename we gave it (`aggregated-results.csv`) to write the results to. ## Forking into multiple branches So far our pipeline has been a single, linear chain of steps. But the pipeline can 'fork' into multiple pipeline branches. Branching allows us to save multiple results files from the same pipeline. This can be useful when you have a computationally intensive piece of analysis, but then want to present the same results in several different ways, e.g. aggregating by region, aggregating nationally, and the raw results. Let's try branching from the earlier `calc_floorspace` step. We _reference_ the step by name, add a `->`, and then specify the pipeline step to send the data to. For now, let's just save the `calc_floorspace` results to a separate file. Try running the following pipeline. It should now produce *two* different output files. ```none input('buildings.csv') -> filter(Use_Cat != 'Outbuilding') -> select({ Use_Cat, Cons_Frame as construction, REGION, int(storeys) * float(area) as floorspace }) as calc_floorspace -> select({ *, round(floorspace * 10.76) as square_footage }) -> group(by: { construction, REGION }, select: { REGION as Region, construction as "Building type", sum(square_footage) as total_square_footage, count(*) as Num_buildings }) -> save('aggregated-results.csv', format: 'csv') # send a copy of the earlier 'calc_floorspace' step's output to file calc_floorspace -> save('raw_floorspace.csv') ``` This example also uses a comment to annotate the pipeline. Use `#` to start a comment line, just like you would in an INI file. .. note:: The ``->`` in pipelines is really important as it tells RiskScape whether two steps should be chained together or not. Notice that there is **no** ``->`` after the ``save('aggregated-results.csv')`` step because we are starting a separate pipeline chain. .. tip:: Saving the results from an intermediary pipeline step can be a handy way to debug whether your pipeline is working correctly. Be careful if you have a large dataset though, as the resulting output files could get very big. ## Recap We have covered a lot of pipeline basics here, so let's recap some of the key points. - A pipeline is just a series of data processing steps. - The `input` step reads your data at the _start_ of a pipeline chain. - We use `->` to chain two steps together, i.e. the output data from one step becomes the input for the next. - The `select` step _transforms_ your data. It's a good place to make function calls. - The `group` step _aggregates_ your data to produce a combined result, i.e. `sum()`, `count()`, `mean()`, etc. - The `Struct` expression in a `select` step applies to _each_ row, whereas the `Struct` expression in a `group` step applies across _all_ rows. - There are other steps that perform other actions. E.g. `filter` steps can exclude or include rows of data, and the `save` step can write the output data to file. - We can use `->` to chain the same step to multiple downstream steps. This 'forks' the pipeline into multiple branches. So far our pipeline has only been using a single dataset as input. When modelling real-world hazards, you will almost always want to _join_ multiple geospatial layers together. The next :ref:`advanced_pipelines` tutorial will walk you through writing a simple risk model pipeline from scratch. ## Extra for experts If you want to explore the basic pipeline concepts a little further, you could try the following exercises out on your own. - Try using the `sort` step to sort the `aggregated-results.csv` results by region. _Tip 1_: You can use `riskscape pipeline step info sort` to find out more about the sort step. _Tip 2_: You will want to add the sort step between the `group` and the `save` steps. - Add the average `floorspace` for each building to the `aggregated-results.csv` results file. To calculate the average, you will need to use the `mean()` aggregation function. - Add an extra `filter` step to exclude the buildings without any `Region` information from the results. In these cases, the `Region` attribute is an empty text string that is equal to `''`. - Try to figure out the difference between the mean and median `floorspace` across the entire building dataset, i.e. - Fork a new pipeline chain from the `calc_floorspace` step. - Use the `mean()` and `median()` aggregation functions in a `group` step. To aggregate across everything, you omit the `by` parameter from the group step. - _Tip_: It can be easiest to copy snippets of pipeline code that do *almost* what you want, and then adjust them to suit your needs. - Try to add an additional output file that records how big the largest masonry building is in each region. You could do this by: - Forking a new pipeline chain from the `calc_floorspace` step. - Filtering the data so that only 'Masonry' buildings are included. - Using the `max()` aggregation function in a `group` step, where you are grouping by `REGION`. - You can also use the `riskscape pipeline evaluate` command to run pipeline code. Try running `riskscape pipeline evaluate "input('buildings.csv', limit: 10)"`. Look at the output file that gets produced and see if you can work out what the `limit: 10` did. The `limit` parameter can be a handy way to debug a pipeline, particularly if it is slow to run over *all* your input data.