How to resolve python KeyError while using pathos ParallelPool

259 views Asked by At

I am trying to parallelize my optimization problem with pathos parallelpool and I keep getting the error below. I'm testing this code on my local PC, but my end goal is to move it over to a high performance computing cluster and I learnt parallelpool is a better option than processpool for that:

An error has occured during the function import
Traceback (most recent call last):
  File "/Users/admin/opt/anaconda3/lib/python3.9/site-packages/ppft/__main__.py", line 88, in run
    exec(__fobj)
  File "<string>", line 1, in <module>
NameError: name 'ComponentMap' is not defined
A fatal error has occured during the function execution
Traceback (most recent call last):
  File "/Users/admin/opt/anaconda3/lib/python3.9/site-packages/ppft/__main__.py", line 97, in run
    __f = locals()[ppc.str_(__fname)]
KeyError: 'create_sub_problem'
 An error has occured during the function import
Traceback (most recent call last):
  File "/Users/admin/opt/anaconda3/lib/python3.9/site-packages/ppft/__main__.py", line 88, in run
    exec(__fobj)
  File "<string>", line 1, in <module>
NameError: name 'ComponentMap' is not defined
A fatal error has occured during the function execution
Traceback (most recent call last):
  File "/Users/admin/opt/anaconda3/lib/python3.9/site-packages/ppft/__main__.py", line 97, in run
    __f = locals()[ppc.str_(__fname)]
KeyError: 'create_sub_problem'

Below is the code I'm trying to run (it's quite lengthy):

def create_sub_problem(*args, **kwds):
    
    import pyomo.environ as pyo
    from pyomo.opt import SolverFactory
    from math import pi
    from math import inf
    from time import sleep

    import numpy as np
    import pandas as pd
    import dask
    from dask import delayed

    from math import isnan

    from pathos.pools import ParallelPool

    #-------Data --------------------#

    #Note: Data in file must be stored in a way that every value tallies with the corresponding element number. If an element does not have a specific value, that space shopuld be left blank--#

    #-------Read the majority content of data file-------#
    df_main = pd.read_csv('G&TEP data.csv')  
    df_main.index +=1 #Shift index by 1 so it begins from 1 instead of 0

    #----------Read the data for uncertain demand-------#
    df_dem = pd.read_csv('G&TEP data.csv')  
    df_dem['PDOmax_index'] = list(zip(df_dem.Demand, df_dem.Operating,df_dem.Scenario)) #create new PDOmax_index as tuple of 3 variables
    df_dem= df_dem.set_index('PDOmax_index') #Set specified column as index

    #------Read the data for network------#
    #For sending and receiving line
    df_srl = pd.read_csv('G&TEP data.csv')   #list comprehension and itertuples would be used later on

    #For existing generators to nodes
    df_nE = pd.read_csv('G&TEP data.csv')  
    df_nE= df_nE.set_index('E') #Set specified column as index

    #For candidate to nodes
    df_nC = pd.read_csv('G&TEP data.csv')  
    df_nC= df_nC.set_index('C') #Set specified column as index

    #For demand to nodes
    df_nD = pd.read_csv('G&TEP data.csv')  
    df_nD= df_nD.set_index('D') #Set specified column as index

    #Existing Generator Max power
    PEmax = df_main['PEmax']
    PEmax= PEmax.to_dict() #convert df to dict
    PEmax = {k:PEmax[k] for k in PEmax if not isnan(PEmax[k])} # remove na values 

    #Candidate Generator Max size
    PCmaxS = df_main['PCmaxS']
    PCmaxS= PCmaxS.to_dict() #convert df to dict
    PCmaxS = {k:PCmaxS[k] for k in PCmaxS if not isnan(PCmaxS[k])} # remove na values 

    #annualized investment cost of candidate generator
    ICa = df_main['ICa']
    ICa= ICa.to_dict() #convert df to dict
    ICa = {k:ICa[k] for k in ICa if not isnan(ICa[k])} # remove na values 

    #annualized investment cost of line
    ILa = df_main['ILa']
    ILa= ILa.to_dict() #convert df to dict
    ILa = {k:ILa[k] for k in ILa if not isnan(ILa[k])} # remove na values 

    #investment cost of candidate generator
    IC = df_main['IC']
    IC= IC.to_dict() #convert df to dict
    IC = {k:IC[k] for k in IC if not isnan(IC[k])} # remove na values 

    #investment cost of line
    IL = df_main['IL']
    IL= IL.to_dict() #convert df to dict
    IL = {k:IL[k] for k in IL if not isnan(IL[k])} # remove na values 

    #Load shedding costs
    CLS = df_main['CLS']
    CLS= CLS.to_dict() #convert df to dict
    CLS = {k:CLS[k] for k in CLS if not isnan(CLS[k])} # remove na values 

    #Susceptance
    B = df_main['B']
    B= B.to_dict() #convert df to dict
    B = {k:B[k] for k in B if not isnan(B[k])} # remove na values 

    #Max line flow limits
    Fmax = df_main['Fmax']
    Fmax= Fmax.to_dict() #convert df to dict
    Fmax = {k:Fmax[k] for k in Fmax if not isnan(Fmax[k])} # remove na values 

    #weight of operating conditions
    p = df_main['p']
    p= p.to_dict() #convert df to dict
    p = {k:p[k] for k in p if not isnan(p[k])} # remove na values 

    #probability of scenarios
    phi = df_main['phi']
    phi= phi.to_dict() #convert df to dict
    phi = {k:phi[k] for k in phi if not isnan(phi[k])} # remove na values 

    #investment budget of capacity generators
    IBC = df_main['IBC']
    IBC= IBC.to_dict() #convert df to dict
    IBC = {k:IBC[k] for k in IBC if not isnan(IBC[k])} # remove na values 

    #investment budget of lines
    IBL = df_main['IBL']
    IBL= IBL.to_dict() #convert df to dict
    IBL = {k:IBL[k] for k in IBL if not isnan(IBL[k])} # remove na values

    #Get existing generator cost
    CE = df_main['CE']
    CE= CE.to_dict() #convert df to dict
    CE = {k:CE[k] for k in CE if not isnan(CE[k])} # remove na values 

    #Get candidate generator cost
    CC = df_main['CC']
    CC= CC.to_dict() #convert df to dict
    CC = {k:CC[k] for k in CC if not isnan(CC[k])} # remove na values 

    #Demand at operating conditions with uncertainty
    PDOmax = df_dem['PDOmax']
    PDOmax= PDOmax.to_dict() #convert df to dict
    PDOmax = {k:PDOmax[k] for k in PDOmax if not isnan(PDOmax[k])} # remove na values 

    # # lines:(sending node,receiving node) using list comprehension and itertuples
    srl = dict([(t.Line, (t.Sending_node, t.Receiving_node)) for t in df_srl.itertuples()])
    srl = {k: srl[k] for k in srl if not isnan(k)} #remove nan

    #Existing generator to node
    nE = df_main['nE']
    nE= nE.to_dict() #convert df to dict
    nE = {k:nE[k] for k in nE if not isnan(nE[k])} # remove na values 

    #Candidate generator to node
    nC = df_main['nC']
    nC= nC.to_dict() #convert df to dict
    nC = {k:nC[k] for k in nC if not isnan(nC[k])} # remove na values 

    #Demand to node
    nD = df_main['nD']
    nD= nD.to_dict() #convert df to dict
    nD = {k:nD[k] for k in nD if not isnan(nD[k])} # remove na values 



#     #---------------sets and indices----------------#
#     Ref = [1]  #Reference node
#     GenUnitsE = [1]
#     GenUnitsC = [1]
#     Txlines = [1,2] #transmission lines in the system (existing+candidate)
#     TxlinesE = [1] # existing transmission lines
#     TxlinesC = [2] # candidate transmission lines 
#     OperCon = [1,2] #operating conditions (cannot be defined as a list of range)
#     Nodes = [1,2] # nodes

#     Time = [1,2] #Time steps (cannot be defined as a list of range)
#     Demand = [1]  #Demand in the system
#     Scenarios = [1,2] #scenarios


    Ref = [1]  #Reference node
    GenUnitsE =list(range(1, len(PEmax.keys())+1))
    GenUnitsC = list(range(1, len(PCmaxS.keys())+1))
    Txlines = list(range(1, len(srl.keys())+1)) #transmission lines in the system (existing+candidate)
    TxlinesE =list(range(1, len(srl.keys()) - len(IL.keys())+1)) # existing transmission lines 
    TxlinesC = list(IL.keys())
    OperCon = sorted(df_dem.Operating.unique()) #operating conditions
    Nodes =sorted( pd.unique(df_srl[['Sending_node', 'Receiving_node']].values.ravel()))
    Nodes = [x for x in Nodes if str(x) != 'nan'] #number of nodes
    #Time = [1,2] #Time steps (cannot be defined as a list of range)
    Demand = sorted(df_dem.Demand.unique())  #Demand in the system
    Scenarios =  sorted(df_dem.Scenario.unique()) #scenarios
    
    #------------------------#
    
    
    (Scenario,PCmax1_fixed, XL2_fixed) = args
    
    print("currently inside subproblem")

    sub = pyo.ConcreteModel()
    
        #Define domain of variables
    sub.PEO = Var(GenUnitsE, OperCon, Scenarios, within = NonNegativeReals) #Power produced by existing generating unit i in operating condition i 

    sub.PCO = Var(GenUnitsC, OperCon,Scenarios, within = NonNegativeReals) #Power produced by candidate generating unit i in operating condition i

    sub.PLO = Var(Txlines, OperCon,Scenarios,  within = NonNegativeReals) #Power flow through tx line i in oper cond i 

    sub.PCmax = Var(GenUnitsC, within = NonNegativeReals) #Size of candidate unit i to be built at time period i

    sub.PLSO = Var(Demand, OperCon,Scenarios, within = NonNegativeReals) #Load shed in demand i operating condition i

    sub.XL = Var(TxlinesC, within = Binary) #Binary decision to build tx line i 

    sub.theta = Var(Nodes, OperCon, Scenarios,  within = Reals) #voltage angle at node i oper cond 1..not nodes but r and s tx lines

    sub.M = Param(default = 9000)   #LArge enough number for linearization

    sub.PCmax1_fixed = Param(default = PCmax1_fixed)   #fix comp variable as parameter
    sub.XL2_fixed = Param(default = XL2_fixed)   #fix comp variable as parameter
    
    sub.theta[1,1,1] ==0
    sub.theta[1,2,1] ==0
    sub.theta[1,1,2] ==0
    sub.theta[1,2,2] ==0
    
    #subproblem 1
    w=Scenario
    
    #Objective function of subroblem1
    def sub_objective(sub):
        return phi[w]* (sum(p[o] *(sum(CE[g]*sub.PEO[g,o,w] for g in GenUnitsE ) +
                                     sum(CC[c]*sub.PCO[c,o,w] for c in GenUnitsC)
                                     + sum(CLS[d]*sub.PLSO[d,o,w] for d in Demand))for o in OperCon))
                   
    sub.objective= Objective (rule = sub_objective, sense=minimize)

    
    def GenDem (sub,n,o):
        return sum(sub.PEO[g,o,w] for g in GenUnitsE if nE[g]==n) + sum(sub.PCO[c,o,w] for c in GenUnitsC if nC[c]==n) - sum(sub.PLO[l,o,w] for l in Txlines if srl[l][0]==n) + sum(sub.PLO[l,o,w] for l in Txlines if srl[l][1]==n) == sum(PDOmax[d,o,w] - sub.PLSO[d,o,w] for d in Demand if nD[d]==n)
    sub.constraint5 = Constraint (Nodes,OperCon, rule=GenDem)

    # #Power flows for existing tx lines
    def PL (sub,l,o):
        return sub.PLO[l,o,w] == B[l] * (sub.theta[srl[l][0],o,w] - sub.theta[srl[l][1],o,w])
    sub.constraint6 = Constraint (TxlinesE,OperCon, rule=PL)

    # #Linear formulation--Power flow for candidate lines
    def Pflowa (sub,l,o):
        return -1*sub.XL[l] *Fmax[l] <= sub.PLO[l,o,w,]
    sub.constraint7a = Constraint (TxlinesC,OperCon,rule=Pflowa)

    def Pflowb (sub,l,o):
        return sub.PLO[l,o,w] <= sub.XL[l] *Fmax[l]
    sub.constraint7b = Constraint (TxlinesC,OperCon, rule=Pflowb)

    def Pflow2a (sub,l,o):
        return  sub.M*-1*(1- sub.XL[l]) <= sub.PLO[l,o,w] - B[l] * (sub.theta[srl[l][0],o,w] - sub.theta[srl[l][1],o,w])
    sub.constraint8a = Constraint (TxlinesC,OperCon, rule=Pflow2a)

    def Pflow2b (sub,l,o):
        return sub.PLO[l,o,w] - B[l] * (sub.theta[srl[l][0],o,w] - sub.theta[srl[l][1],o,w]) <= sub.M*(1- sub.XL[l])
    sub.constraint8b = Constraint (TxlinesC,OperCon, rule=Pflow2b)


    #Line flow limits
    def LineFlowa (sub,l,o):
        #return -1*Fmax[l] <= model.PLO[l,o,t] <= Fmax[l]
        return -1*Fmax[l] <= sub.PLO[l,o,w]
    sub.constraint9a = Constraint (Txlines,OperCon, rule= LineFlowa)

    def LineFlowb (sub,l,o):
        #return -1*Fmax[l] <= model.PLO[l,o,t] <= Fmax[l]
        return sub.PLO[l,o,w]  <= Fmax[l]
    sub.constraint9b = Constraint (Txlines,OperCon, rule= LineFlowb)

    #Power bounds for existing generator power
    def PE (sub,e,o):
        return sub.PEO[e,o,w] <= PEmax[e]
    sub.constraint10 = Constraint(GenUnitsE,OperCon, rule=PE)

    #Power bounds for candidate generator power  Question: How to indicate Tau
    def PC (sub,c,o):
        return sub.PCO[c,o,w] <=sub.PCmax[c] 
    sub.constraint11 = Constraint(GenUnitsC,OperCon, rule=PC)

    #Demand load shedding bounds
    def LShed (sub,d,o):
        return sub.PLSO[d,o,w] <= PDOmax[d,o,w]  
    sub.constraint12 = Constraint(Demand,OperCon, rule=LShed)

    #Node angles

    def thetana (sub,n,o):
        return  -pi <= sub.theta[n,o,w]
    sub.constraint13a = Constraint(Nodes,OperCon,rule = thetana)

    def thetanb (sub,n,o):
        return sub.theta[n,o,w] <= pi
    sub.constraint13b = Constraint(Nodes,OperCon, rule = thetanb)


    #Reference node
    def Reftheta (sub,r,o):
        return sub.theta[r,o,w]  == 0   
    sub.constraint14 = Constraint(Ref,OperCon, rule=Reftheta)

    def fix_comp1(sub):
        return sub.PCmax[1] == sub.PCmax1_fixed
    sub.constraint22a = Constraint (rule = fix_comp1)
    
    def fix_comp2(sub):
        return sub.XL[2] == sub.XL2_fixed 
    sub.constraint23a = Constraint (rule = fix_comp2)
    
    # Get dual variable of fixed constraint
    sub.dual = Suffix(direction=Suffix.IMPORT)
    
        #Call the gurobi solver to solve the model and save output to "results"
   # opt = SolverFactory ("gurobi")
    opt = SolverFactory ("ipopt")
    
    results = opt.solve(sub)
    
    dual_fixed_11 = sub.dual[sub.constraint22a] #dual varibale of comp 1 in s 1
    dual_fixed_12 = sub.dual[sub.constraint23a] #dual varibale of comp 2 in s 1                        
                              
    return value(sub.objective),dual_fixed_11, dual_fixed_12


if __name__ == '__main__':
    pool=ParallelPool()
    a =pool.map(create_sub_problem,[1,2],[20.0,20.0],[1,1])
    print(a)

I've tried doing the following:

(1)I've tried using pathos processpool and it works. So, my function code works, and I confirmed this by also calling the function directly.

I want to use parallelpool because my end goal is to take the code over to a high performance computing(HPC) cluster and parallelize across multiple cores there. I tried doing this with the processpool but it seems to not parallelize on the HPC and runs on just one core. From what I learnt, using parallelpool is better in such instances.

(2)I read the pathos documentation and it says an error could spring up when you try using interactive or imported functions with parallelpool. The work around given was to wrap the function. I tried both wrapping the function (as in the posted code) and using it directly, nothing worked.

(3)I tried to encapsulate my function by having all imports inside of it. I also tried having all parameters implictly defined inside of it without having to read from a file, still nothing worked.

Can someone please help me out? Thank you!

0

There are 0 answers