--- title: "Recursive self-modification" output: rmarkdown::html_vignette: toc: true toc_depth: 4 description: > Shows how you can setup pipelines to modify themselves at runtime, which, for example, allows for changing pipeline parameters based on intermediate results or even dynamically modify the pipeline's own structure during a pipeline run. vignette: > %\VignetteIndexEntry{Recursive self-modification} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r knitr-setup, include = FALSE} require(pipeflow) knitr::opts_chunk$set( comment = "#", prompt = FALSE, tidy = FALSE, cache = FALSE, collapse = TRUE ) old <- options(width = 100L) ``` The {pipeflow} package aims to offer a lean and intuitive interface that enables new users to get started quickly without having to learn a lot of new concepts and functions. At the same time, it was also designed to provide easy access to the underlying data structures to allow advanced users to modify the pipeline basically in any way they want. As such pipelines not only can be modified before but also during execution, respectively. This opens up a wide range of possibilities, for example, to change parameters based on intermediate results or even to modify the pipeline structure itself during a pipeline run. In the following, we will show some examples of how this can be done. ### The pipeline object Let's first define a pipeline, which fits a linear model, checks it's residuals for normality using the Shapiro-Wilk test, and plots the residuals. ```{r} library(pipeflow) pip <- pip_new("my-pipeline") |> pip_add( "data", function(data = NULL) data ) |> pip_add( "fit", function( data = ~data, xVar = "x", yVar = "y" ) { lm(paste(yVar, "~", xVar), data = data) } ) |> pip_add( "residual_shapiro_p_value", function(fit = ~fit) { residuals <- residuals(fit) shapiro.test(residuals)$p.value } ) |> pip_add( "plot", function( fit = ~fit, pointColor = "black" ) { require(ggplot2, quietly = TRUE) data <- data.frame( fitted = predict(fit), residuals = residuals(fit) ) ggplot(data, aes(x = fitted, y = residuals)) + geom_point(shape = 21, color = pointColor) + geom_hline(yintercept = 0, linetype = "dashed") + theme_minimal() } ) ``` If you have followed the previous vignettes, you by now are used to the pipeline overview. ```{r} pip ``` To inspect the internal structure, let's start with the class. ```{r} class(pip) ``` As we can see, the pipeline object is stored in an environment. ```{r} ls(pip) ``` There is the "name" of the pipeline, which is just a character string. The most interesting object is the "pipeline", object, which under the hood is a `data.table` object containing all the information about the steps, their dependencies, meta information, and so on. ```{r} data.class(pip$pipeline) pip$pipeline ``` ### Changing pipeline parameters at runtime First, we set some data and parameters and run the pipeline as usual. ```{r, fig.alt = "residual-plot"} pip |> pip_set_params(list(data = airquality, xVar = "Ozone", yVar = "Temp")) pip_run(pip) pip[["plot", "out"]] ``` Now let's imagine, we want to change the color of the points in the plot depending on the Shapiro-Wilk test result. The obvious way to do this would be to change the `plot` step by passing the test result to the `plot` step function and change the color there. However, here we are interested in another way that would keep the `plot` function unchanged. For example, we could run the pipeline a second time as follows: ```{r, fig.alt = "residual-plot"} if (pip[["residual_shapiro_p_value", "out"]] < 0.05) { pip |> pip_set_params(list(pointColor = "red")) |> pip_run() } pip[["plot", "out"]] ``` As was mentioned in another vignette, this solution is not ideal, as it requires to run additional code around the pipeline. We rather want to set the parameter from within the pipeline during execution. Luckily, the pipeline by default assigns itself to the `.self` parameter to potentially be used in any step functions. With this in mind, we update the `residual_shapiro_p_value` step as follows: ```{r} pip |> pip_replace( "residual_shapiro_p_value", function( fit = ~fit, .self = NULL ) { residuals <- residuals(fit) p <- shapiro.test(residuals)$p.value if (p < 0.05) { .self |> pip_set_params(list(pointColor = "blue")) } p } ) ``` Now we just have to make sure to set the `.self` parameter. ```{r, fig.alt = "residual-plot2"} pip_run(pip) pip[["plot", "out"]] ``` This simple "trick" opens up a wide range of possibilities for pipeline modifications at runtime. As we will show in the next section, this is not limited to changing parameters but can also be used to modify the very own pipeline structure. ### Changing pipeline structure at runtime Subsequently, the pipeline steps will be comprised only of very basic functions in order to keep matters simple. The focus here is on the pipeline structure and how it can be modified at runtime. ```{r} pip <- pip_new("my-pipeline") |> pip_add("init", function(xInit = 0) xInit) |> pip_add("f1", function(x = ~init) x + 1) |> pip_add("f2", function(x = ~f1) x + 2) |> pip_add("f3", function(x = ~f2) x + 3) ``` This pipeline just adds 1, 2, and 3 to the initial value, respectively. ```{r} pip_run(pip) pip ``` The `out` column in the table shows the output of each step. Now let's modify step `f2` that in turn will modify `f3` at runtime based on the interim result passed into `f2`. #### Modify steps ```{r} pip |> pip_replace( "f2", function( x = ~f1, .self = NULL ) { if (x > 10) { .self |> pip_replace("f3", function(x = ~f1) x * 3) return(x / 2) } x + 2 } ) ``` Basically, step `f2` now checks if the input is greater than 10, and if so, it replaces step `f3` with a new step now referencing `f1` that multiplies the input passed from `f1` by 3 and returns half of the input. To see this, let's try it with an input of 15. ```{r} pip |> pip_set_params(list(xInit = 15)) |> pip_run() pip ``` We see that both the output of the pipeline and the dependencies of the last step have changed. Let's confirm by inspecting the function of the last step. ```{r} pip[["f3", "fun"]] ``` #### Insert and remove steps For our last example, we get even more hacky an dinstead of just replacing, we will go a bit further to insert and remove steps. The pipeline definition is as follows: ```{r} pip <- pip_new("my-pipeline") |> pip_add("init", function(xInit = 0) xInit) |> pip_add("f1", function(x = ~init) x + 1) |> pip_add( "f2", function( x = ~f1, .self = NULL ) { if (x > 10) { .self |> pip_add( "f2a", function(x = ~f1) x + 21, after = "f1", ) |> pip_add( "f2b", function(x = ~f2a) x + 22, after = "f2a" ) |> pip_replace( "f3", function(x = ~f2b) { x + 30 } ) |> pip_remove("f2") } x + 2 } ) |> pip_add("f3", function(x = ~f2) x + 3) ``` Basically, if the input is greater than 10, we insert two new steps `f2a` and `f2b` after `f1`, remove `f2`, and replace `f3` with a new step that adds 30 to the input. Let's first run with the initial value of 0 to see the original output. ```{r} pip_run(pip) pip ``` Next, we set the initial value to 11 to trigger the changes. ```{r} pip |> pip_set_params(list(xInit = 11)) |> pip_run() pip ``` While the structure has changed as expected, some steps were not yet run. In fact, since originally step `f3`came after `f2`, and in contrast to what the log is showing, instead of step `f3`, actually the new step `f2b` was run, albeit with x = NULL as input. So to have the true results, we need to re-init the parameter and need to re-run the pipeline. ```{r} pip |> pip_set_params(list(xInit = 11)) |> pip_run() pip ``` Now the output of all steps is as expected. If we want to use {pipeflow} in production, obviously, having to re-run the pipeline and temporarily showing a wrong log is not ideal. Luckily, {pipeflow} provides a built-in solution for this. First, we have to make sure that any step modifying the pipeline structure returns the modified pipeline object itself, so let's redefine the pipeline as follows: ```{r} pip <- pip_new("my-pipeline") |> pip_add("init", function(xInit = 0) xInit) |> pip_add("f1", function(x = ~init) x + 1) |> pip_add( "f2", function( x = ~f1, .self = NULL ) { if (x > 10) { .self |> pip_add( "f2a", function(x = ~f1) x + 21, after = "f1", ) |> pip_add( "f2b", function(x = ~f2a) x + 22, after = "f2a" ) |> pip_replace( "f3", function(x = ~f2b) { x + 30 } ) |> pip_remove("f2") return(.self) # <-- return modified pipeline } x + 2 } ) |> pip_add("f3", function(x = ~f2) x + 3) ``` Then let's run the pipeline again while also setting the `recursive` argument to `TRUE` and have a closer look at the log. ```{r} pip |> pip_set_params(list(xInit = 11)) |> pip_run(recursive = TRUE) ``` As you can see, the run is now automatically aborted right after the pipeline was modified and, since we have set `recursive = TRUE`, the pipeline is also restarted automatically based on the new structure. As a result, the log now is fully aligned with the performed pipeline run. Looking at the final pipeline overview, we see that the output matches the expected output of the modified pipeline. ```{r} pip ``` In summary, this was just a silly example to show some possibilities and I leave it to the user to come up with more sensible and complex use cases. Lastly note that since you have full access to the pipeline object, of course, you can get even more hacky, but be aware that some additional operations are done under the hood when steps are added or removed. It is therefore not recommended to "manually" manipulate the internal data.table object in terms of removing or adding rows, or changing important columns such as `depends` or `.nodeId` as this immediately would invalidate the internal consistency of the dependency graph. On the other hand, changing entries in columns such as `tags`, `time`, `state` or `output` is generally not critical. If in doubt, just try and see what works.