I am trying to parallelize my optimization problem with pathos parallelpool and I keep getting the error below. I'm testing this code on my local PC, but my end goal is to move it over to a high performance computing cluster and I learnt parallelpool is a better option than processpool for that:
An error has occured during the function import
Traceback (most recent call last):
File "/Users/admin/opt/anaconda3/lib/python3.9/site-packages/ppft/__main__.py", line 88, in run
exec(__fobj)
File "<string>", line 1, in <module>
NameError: name 'ComponentMap' is not defined
A fatal error has occured during the function execution
Traceback (most recent call last):
File "/Users/admin/opt/anaconda3/lib/python3.9/site-packages/ppft/__main__.py", line 97, in run
__f = locals()[ppc.str_(__fname)]
KeyError: 'create_sub_problem'
An error has occured during the function import
Traceback (most recent call last):
File "/Users/admin/opt/anaconda3/lib/python3.9/site-packages/ppft/__main__.py", line 88, in run
exec(__fobj)
File "<string>", line 1, in <module>
NameError: name 'ComponentMap' is not defined
A fatal error has occured during the function execution
Traceback (most recent call last):
File "/Users/admin/opt/anaconda3/lib/python3.9/site-packages/ppft/__main__.py", line 97, in run
__f = locals()[ppc.str_(__fname)]
KeyError: 'create_sub_problem'
Below is the code I'm trying to run (it's quite lengthy):
def create_sub_problem(*args, **kwds):
import pyomo.environ as pyo
from pyomo.opt import SolverFactory
from math import pi
from math import inf
from time import sleep
import numpy as np
import pandas as pd
import dask
from dask import delayed
from math import isnan
from pathos.pools import ParallelPool
#-------Data --------------------#
#Note: Data in file must be stored in a way that every value tallies with the corresponding element number. If an element does not have a specific value, that space shopuld be left blank--#
#-------Read the majority content of data file-------#
df_main = pd.read_csv('G&TEP data.csv')
df_main.index +=1 #Shift index by 1 so it begins from 1 instead of 0
#----------Read the data for uncertain demand-------#
df_dem = pd.read_csv('G&TEP data.csv')
df_dem['PDOmax_index'] = list(zip(df_dem.Demand, df_dem.Operating,df_dem.Scenario)) #create new PDOmax_index as tuple of 3 variables
df_dem= df_dem.set_index('PDOmax_index') #Set specified column as index
#------Read the data for network------#
#For sending and receiving line
df_srl = pd.read_csv('G&TEP data.csv') #list comprehension and itertuples would be used later on
#For existing generators to nodes
df_nE = pd.read_csv('G&TEP data.csv')
df_nE= df_nE.set_index('E') #Set specified column as index
#For candidate to nodes
df_nC = pd.read_csv('G&TEP data.csv')
df_nC= df_nC.set_index('C') #Set specified column as index
#For demand to nodes
df_nD = pd.read_csv('G&TEP data.csv')
df_nD= df_nD.set_index('D') #Set specified column as index
#Existing Generator Max power
PEmax = df_main['PEmax']
PEmax= PEmax.to_dict() #convert df to dict
PEmax = {k:PEmax[k] for k in PEmax if not isnan(PEmax[k])} # remove na values
#Candidate Generator Max size
PCmaxS = df_main['PCmaxS']
PCmaxS= PCmaxS.to_dict() #convert df to dict
PCmaxS = {k:PCmaxS[k] for k in PCmaxS if not isnan(PCmaxS[k])} # remove na values
#annualized investment cost of candidate generator
ICa = df_main['ICa']
ICa= ICa.to_dict() #convert df to dict
ICa = {k:ICa[k] for k in ICa if not isnan(ICa[k])} # remove na values
#annualized investment cost of line
ILa = df_main['ILa']
ILa= ILa.to_dict() #convert df to dict
ILa = {k:ILa[k] for k in ILa if not isnan(ILa[k])} # remove na values
#investment cost of candidate generator
IC = df_main['IC']
IC= IC.to_dict() #convert df to dict
IC = {k:IC[k] for k in IC if not isnan(IC[k])} # remove na values
#investment cost of line
IL = df_main['IL']
IL= IL.to_dict() #convert df to dict
IL = {k:IL[k] for k in IL if not isnan(IL[k])} # remove na values
#Load shedding costs
CLS = df_main['CLS']
CLS= CLS.to_dict() #convert df to dict
CLS = {k:CLS[k] for k in CLS if not isnan(CLS[k])} # remove na values
#Susceptance
B = df_main['B']
B= B.to_dict() #convert df to dict
B = {k:B[k] for k in B if not isnan(B[k])} # remove na values
#Max line flow limits
Fmax = df_main['Fmax']
Fmax= Fmax.to_dict() #convert df to dict
Fmax = {k:Fmax[k] for k in Fmax if not isnan(Fmax[k])} # remove na values
#weight of operating conditions
p = df_main['p']
p= p.to_dict() #convert df to dict
p = {k:p[k] for k in p if not isnan(p[k])} # remove na values
#probability of scenarios
phi = df_main['phi']
phi= phi.to_dict() #convert df to dict
phi = {k:phi[k] for k in phi if not isnan(phi[k])} # remove na values
#investment budget of capacity generators
IBC = df_main['IBC']
IBC= IBC.to_dict() #convert df to dict
IBC = {k:IBC[k] for k in IBC if not isnan(IBC[k])} # remove na values
#investment budget of lines
IBL = df_main['IBL']
IBL= IBL.to_dict() #convert df to dict
IBL = {k:IBL[k] for k in IBL if not isnan(IBL[k])} # remove na values
#Get existing generator cost
CE = df_main['CE']
CE= CE.to_dict() #convert df to dict
CE = {k:CE[k] for k in CE if not isnan(CE[k])} # remove na values
#Get candidate generator cost
CC = df_main['CC']
CC= CC.to_dict() #convert df to dict
CC = {k:CC[k] for k in CC if not isnan(CC[k])} # remove na values
#Demand at operating conditions with uncertainty
PDOmax = df_dem['PDOmax']
PDOmax= PDOmax.to_dict() #convert df to dict
PDOmax = {k:PDOmax[k] for k in PDOmax if not isnan(PDOmax[k])} # remove na values
# # lines:(sending node,receiving node) using list comprehension and itertuples
srl = dict([(t.Line, (t.Sending_node, t.Receiving_node)) for t in df_srl.itertuples()])
srl = {k: srl[k] for k in srl if not isnan(k)} #remove nan
#Existing generator to node
nE = df_main['nE']
nE= nE.to_dict() #convert df to dict
nE = {k:nE[k] for k in nE if not isnan(nE[k])} # remove na values
#Candidate generator to node
nC = df_main['nC']
nC= nC.to_dict() #convert df to dict
nC = {k:nC[k] for k in nC if not isnan(nC[k])} # remove na values
#Demand to node
nD = df_main['nD']
nD= nD.to_dict() #convert df to dict
nD = {k:nD[k] for k in nD if not isnan(nD[k])} # remove na values
# #---------------sets and indices----------------#
# Ref = [1] #Reference node
# GenUnitsE = [1]
# GenUnitsC = [1]
# Txlines = [1,2] #transmission lines in the system (existing+candidate)
# TxlinesE = [1] # existing transmission lines
# TxlinesC = [2] # candidate transmission lines
# OperCon = [1,2] #operating conditions (cannot be defined as a list of range)
# Nodes = [1,2] # nodes
# Time = [1,2] #Time steps (cannot be defined as a list of range)
# Demand = [1] #Demand in the system
# Scenarios = [1,2] #scenarios
Ref = [1] #Reference node
GenUnitsE =list(range(1, len(PEmax.keys())+1))
GenUnitsC = list(range(1, len(PCmaxS.keys())+1))
Txlines = list(range(1, len(srl.keys())+1)) #transmission lines in the system (existing+candidate)
TxlinesE =list(range(1, len(srl.keys()) - len(IL.keys())+1)) # existing transmission lines
TxlinesC = list(IL.keys())
OperCon = sorted(df_dem.Operating.unique()) #operating conditions
Nodes =sorted( pd.unique(df_srl[['Sending_node', 'Receiving_node']].values.ravel()))
Nodes = [x for x in Nodes if str(x) != 'nan'] #number of nodes
#Time = [1,2] #Time steps (cannot be defined as a list of range)
Demand = sorted(df_dem.Demand.unique()) #Demand in the system
Scenarios = sorted(df_dem.Scenario.unique()) #scenarios
#------------------------#
(Scenario,PCmax1_fixed, XL2_fixed) = args
print("currently inside subproblem")
sub = pyo.ConcreteModel()
#Define domain of variables
sub.PEO = Var(GenUnitsE, OperCon, Scenarios, within = NonNegativeReals) #Power produced by existing generating unit i in operating condition i
sub.PCO = Var(GenUnitsC, OperCon,Scenarios, within = NonNegativeReals) #Power produced by candidate generating unit i in operating condition i
sub.PLO = Var(Txlines, OperCon,Scenarios, within = NonNegativeReals) #Power flow through tx line i in oper cond i
sub.PCmax = Var(GenUnitsC, within = NonNegativeReals) #Size of candidate unit i to be built at time period i
sub.PLSO = Var(Demand, OperCon,Scenarios, within = NonNegativeReals) #Load shed in demand i operating condition i
sub.XL = Var(TxlinesC, within = Binary) #Binary decision to build tx line i
sub.theta = Var(Nodes, OperCon, Scenarios, within = Reals) #voltage angle at node i oper cond 1..not nodes but r and s tx lines
sub.M = Param(default = 9000) #LArge enough number for linearization
sub.PCmax1_fixed = Param(default = PCmax1_fixed) #fix comp variable as parameter
sub.XL2_fixed = Param(default = XL2_fixed) #fix comp variable as parameter
sub.theta[1,1,1] ==0
sub.theta[1,2,1] ==0
sub.theta[1,1,2] ==0
sub.theta[1,2,2] ==0
#subproblem 1
w=Scenario
#Objective function of subroblem1
def sub_objective(sub):
return phi[w]* (sum(p[o] *(sum(CE[g]*sub.PEO[g,o,w] for g in GenUnitsE ) +
sum(CC[c]*sub.PCO[c,o,w] for c in GenUnitsC)
+ sum(CLS[d]*sub.PLSO[d,o,w] for d in Demand))for o in OperCon))
sub.objective= Objective (rule = sub_objective, sense=minimize)
def GenDem (sub,n,o):
return sum(sub.PEO[g,o,w] for g in GenUnitsE if nE[g]==n) + sum(sub.PCO[c,o,w] for c in GenUnitsC if nC[c]==n) - sum(sub.PLO[l,o,w] for l in Txlines if srl[l][0]==n) + sum(sub.PLO[l,o,w] for l in Txlines if srl[l][1]==n) == sum(PDOmax[d,o,w] - sub.PLSO[d,o,w] for d in Demand if nD[d]==n)
sub.constraint5 = Constraint (Nodes,OperCon, rule=GenDem)
# #Power flows for existing tx lines
def PL (sub,l,o):
return sub.PLO[l,o,w] == B[l] * (sub.theta[srl[l][0],o,w] - sub.theta[srl[l][1],o,w])
sub.constraint6 = Constraint (TxlinesE,OperCon, rule=PL)
# #Linear formulation--Power flow for candidate lines
def Pflowa (sub,l,o):
return -1*sub.XL[l] *Fmax[l] <= sub.PLO[l,o,w,]
sub.constraint7a = Constraint (TxlinesC,OperCon,rule=Pflowa)
def Pflowb (sub,l,o):
return sub.PLO[l,o,w] <= sub.XL[l] *Fmax[l]
sub.constraint7b = Constraint (TxlinesC,OperCon, rule=Pflowb)
def Pflow2a (sub,l,o):
return sub.M*-1*(1- sub.XL[l]) <= sub.PLO[l,o,w] - B[l] * (sub.theta[srl[l][0],o,w] - sub.theta[srl[l][1],o,w])
sub.constraint8a = Constraint (TxlinesC,OperCon, rule=Pflow2a)
def Pflow2b (sub,l,o):
return sub.PLO[l,o,w] - B[l] * (sub.theta[srl[l][0],o,w] - sub.theta[srl[l][1],o,w]) <= sub.M*(1- sub.XL[l])
sub.constraint8b = Constraint (TxlinesC,OperCon, rule=Pflow2b)
#Line flow limits
def LineFlowa (sub,l,o):
#return -1*Fmax[l] <= model.PLO[l,o,t] <= Fmax[l]
return -1*Fmax[l] <= sub.PLO[l,o,w]
sub.constraint9a = Constraint (Txlines,OperCon, rule= LineFlowa)
def LineFlowb (sub,l,o):
#return -1*Fmax[l] <= model.PLO[l,o,t] <= Fmax[l]
return sub.PLO[l,o,w] <= Fmax[l]
sub.constraint9b = Constraint (Txlines,OperCon, rule= LineFlowb)
#Power bounds for existing generator power
def PE (sub,e,o):
return sub.PEO[e,o,w] <= PEmax[e]
sub.constraint10 = Constraint(GenUnitsE,OperCon, rule=PE)
#Power bounds for candidate generator power Question: How to indicate Tau
def PC (sub,c,o):
return sub.PCO[c,o,w] <=sub.PCmax[c]
sub.constraint11 = Constraint(GenUnitsC,OperCon, rule=PC)
#Demand load shedding bounds
def LShed (sub,d,o):
return sub.PLSO[d,o,w] <= PDOmax[d,o,w]
sub.constraint12 = Constraint(Demand,OperCon, rule=LShed)
#Node angles
def thetana (sub,n,o):
return -pi <= sub.theta[n,o,w]
sub.constraint13a = Constraint(Nodes,OperCon,rule = thetana)
def thetanb (sub,n,o):
return sub.theta[n,o,w] <= pi
sub.constraint13b = Constraint(Nodes,OperCon, rule = thetanb)
#Reference node
def Reftheta (sub,r,o):
return sub.theta[r,o,w] == 0
sub.constraint14 = Constraint(Ref,OperCon, rule=Reftheta)
def fix_comp1(sub):
return sub.PCmax[1] == sub.PCmax1_fixed
sub.constraint22a = Constraint (rule = fix_comp1)
def fix_comp2(sub):
return sub.XL[2] == sub.XL2_fixed
sub.constraint23a = Constraint (rule = fix_comp2)
# Get dual variable of fixed constraint
sub.dual = Suffix(direction=Suffix.IMPORT)
#Call the gurobi solver to solve the model and save output to "results"
# opt = SolverFactory ("gurobi")
opt = SolverFactory ("ipopt")
results = opt.solve(sub)
dual_fixed_11 = sub.dual[sub.constraint22a] #dual varibale of comp 1 in s 1
dual_fixed_12 = sub.dual[sub.constraint23a] #dual varibale of comp 2 in s 1
return value(sub.objective),dual_fixed_11, dual_fixed_12
if __name__ == '__main__':
pool=ParallelPool()
a =pool.map(create_sub_problem,[1,2],[20.0,20.0],[1,1])
print(a)
I've tried doing the following:
(1)I've tried using pathos processpool and it works. So, my function code works, and I confirmed this by also calling the function directly.
I want to use parallelpool because my end goal is to take the code over to a high performance computing(HPC) cluster and parallelize across multiple cores there. I tried doing this with the processpool but it seems to not parallelize on the HPC and runs on just one core. From what I learnt, using parallelpool is better in such instances.
(2)I read the pathos documentation and it says an error could spring up when you try using interactive or imported functions with parallelpool. The work around given was to wrap the function. I tried both wrapping the function (as in the posted code) and using it directly, nothing worked.
(3)I tried to encapsulate my function by having all imports inside of it. I also tried having all parameters implictly defined inside of it without having to read from a file, still nothing worked.
Can someone please help me out? Thank you!