databricks-connect cannot load module in udf

337 views Asked by At

I'm trying to load PyNaCl into a pyspark UDF running on Windows.

from nacl import bindings as c

def verify_signature(msg, keys):
    c.crypto_sign_ed25519ph_update(...)
    ...

verify_signature_udf = udf(lambda x: verify_signature(x, public_keys), BooleanType())

data_signed = data.withColumn(
    "is_signature_valid", verify_signature_udf("state_values")
)

PyNaCl is installed locally (using databricks-connect) but as I understand it is not installed on the executor. Thus I get this:

File "/databricks/spark/python/pyspark/cloudpickle/cloudpickle.py", line 679, in subimport
    __import__(name)
ModuleNotFoundError: No module named 'nacl'

As described in Python Packaging I'm trying to load it like this:

import os
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
spark = SparkSession.builder.config(
    "spark.archives",
    "pyspark_venv.tar.gz#environment").getOrCreate()

No change, same message. If I just extract the nacl package from the tar.gz and store it as zip file and load it like this:

spark.sparkContext.addPyFile(path="nacl.zip")

it gets loaded but I get this error now:

File "/local_disk0/spark-xxx8db3a-5436-4ce8-8ff5-19eaeb4397b4/executor-xxxb7a74-4e1b-40bf-aae2-fc3553155f91/spark-xxx70cb9-482d-42a9-901a-c36f66a42a19/isolatedSparkFiles/0e10cb02-db69-4d63-b7ea-6c2b415fb5d9/nacl.zip/nacl/bindings/crypto_aead.py", line 17, in <module>
    from nacl._sodium import ffi, lib
ModuleNotFoundError: No module named 'nacl._sodium'

Any ideas? Would it work with dbx? Alternatively is there an option to achieve this without an UDF?

Edit: In the zip file there are the following sodium components. No additional sodium stuff in the tgz which is not in the zip:

./nacl/bindings/sodium_core.py
./nacl/bindings/__pycache__/sodium_core.cpython-39.pyc
./nacl/_sodium.pyd

Edit2: when I move the import into the for loop databricks-connect will run w/o error but the error will be raised upon execution on the executor thus it's also not working like this (was my misunderstanding):


def verify_signature(msg, keys):
    for key in keys:
       # this will prevent databricks-connect to raise a local error
       from nacl import bindings as c  
       c.crypto_sign_ed25519ph_update(...)
       ...

verify_signature_udf = udf(lambda x: verify_signature(x, public_keys), BooleanType())

data_signed = data.withColumn(
    "is_signature_valid", verify_signature_udf(my_keys)("state_values")
)
0

There are 0 answers