Pass parameters/arguments to HDInsight/Spark Activity in Azure Data Factory

236 views Asked by At

I have an on-demand HDInsight cluster that is launched from a Spark Activity within Azure Data Factory and runs PySpark 3.1. To test out my code, I normally launch Jupyter Notebook from the created HDInsight Cluster page.

Now, I would like to pass some parameters to that Spark activity and retrieve these parameters from within Jupyter notebook code. I've tried doing so in two ways, but none of them worked for me:

Method A. as Arguments and then tried to retrieve them using sys.argv[].

Method B. as Spark configuration and then tried to retrieve them using sc.getConf().getAll().

I suspect that either:

  • I am not specifying parameters correctly
  • or using a wrong way to retrieve them in Jupyter Notebook code
  • or parameters are only valid for the Python *.py scripts specified in the "File path" field, but not for the Jupyter notebooks.

Any pointers on how to pass parameters into HDInsight Spark activity within Azure Data Factory would be much appreciated.

enter image description here

1

There are 1 answers

0
Saideep Arikontham On

The issue is with the entryFilePath. In the Spark activity of HDInsight cluster, you must either give the entryFilePath as a .jar file or .py file. When we follow this, we can successfully pass arguments which can be utilized using sys.argv.

  • The following is an example of how you can pass arguments to python script.

enter image description here

  • The code inside nb1.py (sample) is as shown below:
from pyspark import SparkContext
from pyspark.sql import *
import sys

sc = SparkContext()
sqlContext = HiveContext(sc)

# Create an RDD from sample data which is already available
hvacText = sc.textFile("wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")

# Create a schema for our data
Entry = Row('Date', 'Time', 'TargetTemp', 'ActualTemp', 'BuildingID')
# Parse the data and create a schema
hvacParts = hvacText.map(lambda s: s.split(',')).filter(lambda s: s[0] != 'Date')
hvac = hvacParts.map(lambda p: Entry(str(p[0]), str(p[1]), int(p[2]), int(p[3]), int(p[6])))

# Infer the schema and create a table       
hvacTable = sqlContext.createDataFrame(hvac)
hvacTable.registerTempTable('hvactemptable')
dfw = DataFrameWriter(hvacTable)

#using agrument from pipeline to create table.
dfw.saveAsTable(sys.argv[1])
  • When the pipeline is triggered, it runs successfully and the required table will be created (name of this table is passed as an argument from pipeline Spark activity). We can query this table in HDInsight cluster's Jupyter notebook using the following query:
select * from new_hvac

enter image description here

NOTE:

So, please ensure that you are passing arguments to python script (.py file) but not a python notebook.