I wrote the following code to dynamically create simple case/when statements in PySpark. In mapping lists, I provide the output value (first element) as well as mapped keywords that should be either present (second element) or absent (third element in contains_and_doesnt_contain) to map the specified output.
from pyspark.sql import functions as psf
# [output, searched_keywords]
contains = [
['a', 'apple'],
['b', 'boomerang|boom'],
['c', 'carrot'],
]
# [output, keyword1, keyword2]
contains_and_doesnt_contain = [
['d', 'doughnut', 'doom'],
['doom', 'doom', 'doughnut'],
['e', 'ele|element', 'something|more|sophisticated']
]
contains_tuple = [(k, psf.col(contains_col).rlike(v)) for k, v in contains]
contains_and_doesnt_contain_tuple = [(k, (psf.col(contains_col).rlike(v1)) & ~(psf.col(contains_col).rlike(v2))) \
for k, v1, v2 in contains_and_doesnt_contain]
all_conditions_tuple = contains_tuple + contains_and_doesnt_contain_tuple
case_when_statements = reduce(
lambda acc, x: acc.when(x[1], x[0]), all_conditions_tuple, psf).otherwise("")
output_sdf = df.select("*", case_when_statements.alias(new_col))
This works well if conditions are simple and can be contained in a single regex expression. However, I'd like to create more complex conditions consisting of multiple AND statements, for example:
from pyspark.sql import functions as psf
# output, contains_keywords, doesn't contain keywords
exclude_list = ['word4', 'word5', 'word6']
complex_example = ['fancy output', 'word1|word2|word3', exclude_list]
colname = 'text_colname'
output = complex_example[0]
contains_patters = complex_example[1]
no_contains_patterns = complex_example[2]
final_contains_patterns = None
final_not_contains_patterns = None
#contains and contains logic
if isinstance(contains_patters, list) and len(contains_patters)>1:
contains_rlikes = [psf.col(colname).rlike(pattern) for pattern in contains_patters]
nested_and = lambda u, v : (u) & (v)
final_contains_patterns = reduce(nested_and, contains_rlikes)
else:
final_contains_patterns = psf.col(colname).rlike(contains_patters)
#doesn't contain and doesn't contain logic
if isinstance(no_contains_patterns, list) and len(no_contains_patterns)>1:
not_contains_rlikes = [~psf.col(colname).rlike(pattern) for pattern in no_contains_patterns]
nested_and = lambda u, v : (u) & (v)
final_not_contains_patterns = reduce(nested_and, not_contains_rlikes)
else:
final_not_contains_patterns = ~psf.col(colname).rlike(no_contains_patterns)
final_element = [output, final_contains_patterns, final_not_contains_patterns]
However, this will create a wrong nesting of any 'and' patterns. How can I make it work without having to work with UDFs?