3

I'm working with dbplyr and DuckDB to process very large Parquet files using limited system resources. To make my workflow more efficient, I want to create a custom function that can be seamlessly integrated into a dbplyr pipeline.

Because I couldn’t achieve the desired logic using only built-in dbplyr verbs, I implemented the core of this function using raw SQL. Now, I want to wrap this SQL logic into a function that can be used as a regular dbplyr step — meaning I can chain it between other dbplyr operations (before and after it) without breaking the pipeline.

What I’m struggling with is how to integrate this custom SQL-based function properly into the dbplyr workflow, so it behaves like any other dplyr-style function.

library(tidyverse)
library(duckdb)
library(dplyr)
library(dbplyr)
library(glue)
library(purrr)


con <- dbConnect(duckdb(), dbdir = ":memory:")
my_data <- tibble(
  id = 1:100,
  group1 = rep(c("A", "B"), 50),
  group2 = rep(c("C", "D"), each = 50),
  value = c(rnorm(90, 10), runif(10, 50, 100))
)
duckdb_register(con, "df_tmp", my_data)
tbl <- tbl(con, "df_tmp")

remove_outliers <- function(.data, var, by, threshold, ratio, remove = TRUE) {
  require(dbplyr)
  con <- dbplyr::remote_con(.data)
  
  sql_query <- glue("WITH winsor_limits AS (
        SELECT {paste0(.by, collapse = ', ')},
                PERCENTILE_DISC({threshold}) WITHIN GROUP (ORDER BY {var}) * {ratio}  AS threshold
        FROM {sql_render(.data)} as tbl
        GROUP BY {paste0(.by, collapse = ', ')}
     ) 
     SELECT t.* REPLACE (
                  CASE
                    WHEN {var} < threshold then {var}
                    ELSE {ifelse(remove, 'NULL', 'threshold')}
                  END AS {var} 
                  )
     FROM {sql_render(.data)} as t
     JOIN winsor_limits as l
     ON t.group1 = l.group1 AND t.group2 = l.group2")
  
  # This is here where I don't know what to do to add this SQL request to the dbplyr   pipeline
  return(out)
}

dt <- tbl |> 
  remove_outliers(var ="value",
                  by = c("group1", "group2"),
                  threshold = 0.8,
                  ratio = 1, remove = TRUE) 

If there is a better way to implement what I'm trying to do, please tell me!

1 Answer 1

3

For a minimal change you just need to return tbl(con, sql(sql_query))
(and add parens around subqueries and deal with by vs .by):

library(dplyr, warn.conflicts = FALSE)
library(dbplyr, warn.conflicts = FALSE)
library(duckdb)
#> Loading required package: DBI
library(glue)

options(pillar.sigfig = 15, pillar.max_dec_width = 20)
con <- dbConnect(duckdb(), dbdir = ":memory:")

set.seed(42)
my_data <- tibble(
  id = 1:100,
  group1 = rep(c("A", "B"), 50),
  group2 = rep(c("C", "D"), each = 50),
  value = c(rnorm(90, 10), runif(10, 50, 100))
)
duckdb_register(con, "df_tmp", my_data)
tbl <- tbl(con, "df_tmp")

remove_outliers <- function(.data, var, by, threshold, ratio, remove = TRUE) {
  sql_query <- glue("WITH winsor_limits AS (
        SELECT {paste0(by, collapse = ', ')},
                PERCENTILE_DISC({threshold}) WITHIN GROUP (ORDER BY {var}) * {ratio}  AS threshold
        FROM ( {sql_render(.data)} ) as tbl
        GROUP BY {paste0(by, collapse = ', ')}
     ) 
     SELECT t.* REPLACE (
                  CASE
                    WHEN {var} < threshold then {var}
                    ELSE {ifelse(remove, 'NULL', 'threshold')}
                  END AS {var} 
                  )
     FROM ( {sql_render(.data)} ) as t
     JOIN winsor_limits as l
     ON t.group1 = l.group1 AND t.group2 = l.group2")
  
  tbl(remote_con(.data), sql(sql_query))
}

tbl_no_outl_1 <- tbl |> 
  remove_outliers(var ="value",
                  by = c("group1", "group2"),
                  threshold = 0.8,
                  ratio = 1, remove = TRUE) |> 
  print()
#> # Source:   SQL [?? x 4]
#> # Database: DuckDB 1.4.0 [margus@Windows 10 x64:R 4.5.1/:memory:]
#>       id group1 group2             value
#>    <int> <chr>  <chr>              <dbl>
#>  1     1 A      C      NA               
#>  2     2 B      C       9.43530182860391
#>  3     3 A      C      10.3631284113373 
#>  4     4 B      C      10.6328626049610 
#>  5     5 A      C      10.404268323141  
#>  6     6 B      C       9.89387548390852
#>  7     7 A      C      NA               
#>  8     8 B      C       9.9053409615869 
#>  9     9 A      C      NA               
#> 10    10 B      C       9.93728590094758
#> # ℹ more rows

Though you could also get Type 1 quantile (quantile(x, pos, type = 1) in R) with quantile_disc(x, pos) in DuckDB:

# grouped quantile(..., type = 1):
quantile_t1 <- 
  my_data |> 
  summarise(threshold = quantile(value, .8, type = 1), .by = c(group1, group2)) |> 
  arrange(group1, group2) |> 
  # quantile()  adds names, remove for comparison
  mutate(across(everything(), \(x) setNames(x, NULL))) |> 
  print()
#> # A tibble: 4 × 3
#>   group1 group2        threshold
#>   <chr>  <chr>             <dbl>
#> 1 A      C      11.0351035219699
#> 2 A      D      11.5757275197920
#> 3 B      C      10.6556478834022
#> 4 B      D      11.3997368272927

# grouped quantile_disc():
dd_quantile_disc <- 
  sql_query('SELECT group1, group2, quantile_disc("value", 0.8) AS threshold
             FROM df_tmp GROUP BY group1, group2', conn = con) |> 
  arrange(group1, group2) |> 
  as_tibble() |>
  print()
#> # A tibble: 4 × 3
#>   group1 group2        threshold
#>   <chr>  <chr>             <dbl>
#> 1 A      C      11.0351035219699
#> 2 A      D      11.5757275197920
#> 3 B      C      10.6556478834022
#> 4 B      D      11.3997368272927

identical(quantile_t1, dd_quantile_disc)
#> [1] TRUE

Unknown database functions are left as is by dbplyr, so one option is to build a dplyr pipeline that just calls quantile_disc() in summarise() and let dbplyr handle SQL translation:

remove_outliers_2 <- function(.data, var, by, threshold, ratio, remove = TRUE) {
  .data |>
    summarise(threshold = ratio * quantile_disc(.data[[var]], !!threshold), .by = all_of(by)) |> 
    inner_join(x = .data, y = _, by = by) |>
    mutate(
      "{var}" := case_when(
        .data[[var]] < threshold ~ .data[[var]],
        remove ~ NA,
        .default = threshold
      )
    ) |> 
    select(-threshold)
}

tbl_no_outl_2 <- tbl |> 
  remove_outliers_2(var ="value",
                    by = c("group1", "group2"),
                    threshold = 0.8,
                    ratio = 1, remove = TRUE) |> 
  print()
#> # Source:   SQL [?? x 4]
#> # Database: DuckDB 1.4.0 [margus@Windows 10 x64:R 4.5.1/:memory:]
#>       id group1 group2             value
#>    <int> <chr>  <chr>              <dbl>
#>  1     1 A      C      NA               
#>  2     2 B      C       9.43530182860391
#>  3     3 A      C      10.3631284113373 
#>  4     4 B      C      10.6328626049610 
#>  5     5 A      C      10.404268323141  
#>  6     6 B      C       9.89387548390852
#>  7     7 A      C      NA               
#>  8     8 B      C       9.9053409615869 
#>  9     9 A      C      NA               
#> 10    10 B      C       9.93728590094758
#> # ℹ more rows


# identical collected results:
identical(collect(tbl_no_outl_1), collect(tbl_no_outl_2))
#> [1] TRUE

# also identical with remove = FALSE:
identical(
  remove_outliers  (tbl, var ="value", by = c("group1", "group2"), threshold = 0.8, ratio = 1, remove = FALSE) |> collect(),
  remove_outliers_2(tbl, var ="value", by = c("group1", "group2"), threshold = 0.8, ratio = 1, remove = FALSE) |> collect()
)
#> [1] TRUE

Generated query:

show_query(tbl_no_outl_2)
<SQL>
SELECT
  id,
  group1,
  group2,
  CASE WHEN ("value" < threshold) THEN "value" ELSE NULL END AS "value"
FROM (
  SELECT df_tmp.*, threshold
  FROM df_tmp
  INNER JOIN (
    SELECT group1, group2, 1.0 * quantile_disc("value", 0.8) AS threshold
    FROM df_tmp
    GROUP BY group1, group2
  ) RHS
    ON (df_tmp.group1 = RHS.group1 AND df_tmp.group2 = RHS.group2)
) q01

Or with CTEs, if you prefer ( you can also set cte when calling collect() & co):

show_query(tbl_no_outl_2, cte = TRUE)
<SQL>
WITH q01 AS (
  SELECT group1, group2, 1.0 * quantile_disc("value", 0.8) AS threshold
  FROM df_tmp
  GROUP BY group1, group2
),
q02 AS (
  SELECT df_tmp.*, threshold
  FROM df_tmp
  INNER JOIN q01 RHS
    ON (df_tmp.group1 = RHS.group1 AND df_tmp.group2 = RHS.group2)
)
SELECT
  id,
  group1,
  group2,
  CASE WHEN ("value" < threshold) THEN "value" ELSE NULL END AS "value"
FROM q02 q01

If you’d rather stick with your original PERCENTILE_DISC approach, you could still use it in summarise() by escaping SQL, e.g. something like this:

prcntl_disc <- glue("PERCENTILE_DISC({threshold}) WITHIN GROUP (ORDER BY {var}) * {ratio}")
.data |>
  summarise(threshold = sql(prcntl_disc), .by = all_of(by)) |>
  ...
Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.