PySpark: dynamically create complex casewhen statements

29 views Asked by At

I wrote the following code to dynamically create simple case/when statements in PySpark. In mapping lists, I provide the output value (first element) as well as mapped keywords that should be either present (second element) or absent (third element in contains_and_doesnt_contain) to map the specified output.

from pyspark.sql import functions as psf

# [output, searched_keywords]
contains = [
            ['a', 'apple'],
            ['b', 'boomerang|boom'],
            ['c', 'carrot'],
        ]

# [output, keyword1, keyword2]
contains_and_doesnt_contain = [
        ['d', 'doughnut', 'doom'],
        ['doom', 'doom', 'doughnut'],
        ['e', 'ele|element', 'something|more|sophisticated']
    ]

contains_tuple = [(k, psf.col(contains_col).rlike(v)) for k, v in contains]

contains_and_doesnt_contain_tuple = [(k, (psf.col(contains_col).rlike(v1)) & ~(psf.col(contains_col).rlike(v2))) \
        for k, v1, v2 in contains_and_doesnt_contain]

all_conditions_tuple = contains_tuple + contains_and_doesnt_contain_tuple

case_when_statements = reduce(
  lambda acc, x: acc.when(x[1], x[0]), all_conditions_tuple, psf).otherwise("")
    
output_sdf = df.select("*", case_when_statements.alias(new_col))

This works well if conditions are simple and can be contained in a single regex expression. However, I'd like to create more complex conditions consisting of multiple AND statements, for example:

from pyspark.sql import functions as psf

# output, contains_keywords, doesn't contain keywords
exclude_list = ['word4', 'word5', 'word6']
complex_example = ['fancy output', 'word1|word2|word3', exclude_list]

colname = 'text_colname'
output = complex_example[0]
contains_patters = complex_example[1]
no_contains_patterns = complex_example[2]


final_contains_patterns = None
final_not_contains_patterns = None

#contains and contains logic
if isinstance(contains_patters, list) and len(contains_patters)>1: 
    contains_rlikes = [psf.col(colname).rlike(pattern) for pattern in contains_patters]
    nested_and = lambda u, v : (u) & (v)
    final_contains_patterns = reduce(nested_and, contains_rlikes)
else:
    final_contains_patterns = psf.col(colname).rlike(contains_patters)

#doesn't contain and doesn't contain logic
if isinstance(no_contains_patterns, list) and len(no_contains_patterns)>1: 
    not_contains_rlikes = [~psf.col(colname).rlike(pattern) for pattern in no_contains_patterns]
    nested_and = lambda u, v : (u) & (v)
    final_not_contains_patterns = reduce(nested_and, not_contains_rlikes)
else:
    final_not_contains_patterns = ~psf.col(colname).rlike(no_contains_patterns)

final_element = [output, final_contains_patterns, final_not_contains_patterns]

However, this will create a wrong nesting of any 'and' patterns. How can I make it work without having to work with UDFs?

0

There are 0 answers