作者:Flyingis
    本文欢迎友情转载,但请注明作者及原文链接,严禁用于商业目的
 
    让ArcGIS桌面软件进行GP脚本并行运算确实不是一件容易的事情,一方面是粗粒度AO对象的限制,另外一方面是Python本身的问题。
    Python是解释型的语言,使用GIL全局解释器锁在内部禁止并行运算,因此在相同时间内只能有一条指令被执行,为什么存在GIL?是因为Python解释器后台的不可见变量,比如为了进行垃圾回收而维护的引用计数,如果没有GIL,则可能出现由于线程切换导致的对同一对象释放两次的情况(,ArcGIS 9.4将支持这一版本。
    那么,Parallel Python不能解决所有问题,又必须得使用Python 2.5,如何有效利用多核计算(注意,是利用多核计算,并非并行计算)?通常情况下,我们会将多个任务分解成不同的脚本,开启多个Python解释器或ArcCatalog运行,每个程序占用一个核,多核可以同时利用起来,各自处理完成后,进行汇总,得到最终计算结果。ESRI Resource Center上提供了一种类似的方法,差别在于将所有任务都写在一个Python脚本中,实际上和前者的原理相同,作为参考
代码
  1 # Author: ESRI
  2 # Date:   August 2009
  3 # Purpose: This script demonstrates a methodology for parallel geoprocessing in ArcGIS Desktop.
  4 #          The input is split into parts which are processed by separate subprocesses,
  5 #          their outputs are appended into the final result.  In this specific case, the
  6 #          geoprocessing undertaken is address geocoding, but any feasible geoprocessing
  7 #          can be done by simple modification of this script.
  8 #          Look for the comment line "Begin code block for GP process of interest"
  9 try:
 10     import arcgisscripting, os, subprocess, sys, time, traceback
 11     gp = arcgisscripting.create(9.3)
 12     gp.overwriteoutput = True
 13     startTime = time.time()
 14     
 15     #Get the number of concurrent GP processes desired.
 16     #It is recommended this agree with the number of your available cores.
 17     pCount = gp.getparameter(0)
 18     inObject = gp.getparameterastext(1)
 19     result = gp.getcount_management(inObject)
 20     count = int(result.getoutput(0))
 21     gp.addmessage("Processing " + inObject + " in " + str(pCount) + " concurrent processes.")   
 22     #We will use a generic approach to splitting the input into parts, namely slicing it
 23     #with modulo arithmetic.  For example, if we want to run 4 concurrent processes
 24     #we create 4 parts from the input using "ObjectID modulo 4 = 0 (then 1,2,3)"
 25     
 26     #Build the splitting expressionsGeoprocessing scripts 利用多核进行计算(Note you will need to edit this code block if using input from SDE on Oracle)
 27     inDesc = gp.describe(inObject)
 28     delimitedOIDName = str(gp.addfielddelimiters(inObject,str(inDesc.oidfieldname)))
 29     wsType = str(gp.describe(str(inDesc.path)).workspacetype)
 30     queryList = []
 31     for q in range(pCount):
 32         if wsType == "RemoteDatabase"#SDE
 33             #Non-Oracle
 34             queryList.append(delimitedOIDName + " % " + str(pCount) + " = " + str(q))
 35             #Oracle - comment out the above line and uncomment the below line when using Oracle inputs
 36             #queryList.append("mod(\"" + delimitedOIDName + "\"," + str(pCount) + ") = " + str(q))
 37         elif wsType == "FileSystem"#DBase
 38             queryList.append("mod(\"" + delimitedOIDName + "\"," + str(pCount) + ") = " + str(q))
 39         elif wsType == "LocalDatabase" and inDesc.path.endswith(".mdb"): #Personal GDB
 40             queryList.append(delimitedOIDName + " mod " + str(pCount) + " = " + str(q))
 41         elif wsType == "LocalDatabase" and inDesc.path.endswith(".gdb"): #File GDB
 42             queryList.append("mod(\"" + delimitedOIDName + "\"," + str(pCount) + ") = " + str(q))
 43     #Create a seed name for the parts
 44     if inDesc.name.endswith((".dbf",".shp")):
 45         seedName = str(inDesc.name).split(".")[0]
 46     elif inDesc.name.count("."> 1#Fully qualified DBMS object name
 47         seedName = str(inDesc.name).split(".")[-1]
 48     else:
 49         seedName = str(inDesc.name)
 50         
 51     #Make file GDBs at a well known path (to the system); edit this to use fast disk if you have it.
 52     #The child scripts will write into these separate file GDBs so as to avoid lock conflicts.
 53     #import tempfile
 54     #wkPath = tempfile.gettempdir()
 55     wkPath = r"C:\Temp"
 56     gdbNameList = []
 57     j = 0
 58     for i in range(pCount):
 59         gdbName = wkPath+"\parallel"+str(i)+".gdb"
 60         while gp.exists(gdbName): #Do not clobber other parallel processes
 61             j+=1
 62             gdbName = wkPath+"\parallel"+str(j)+".gdb"
 63         gdbNameList.append(gdbName)
 64         result = gp.createfilegdb_management(wkPath,os.path.basename(gdbName))
 65     gp.workspace = "in_memory"
 66     
 67     #Create scripts that geoprocess the parts, write these into the file GDB directories created above
 68     scriptPathList = []
 69     for r in range(pCount):
 70         gdbName = gdbNameList[r]
 71         sName = gdbName+"/parallel.py"
 72         scriptPathList.append(sName)
 73         scriptLines = []
 74         scriptLines.append("import arcgisscripting\n")
 75         scriptLines.append("gp = arcgisscripting.create(9.3)\n")
 76         scriptLines.append("gp.overwriteoutput = True\n")
 77         if str(inDesc.datatype) == "Table" or str(inDesc.datatype) == "DbaseTable":
 78             scriptLines.append("result = gp.tabletotable_conversion("+\
 79                                "\""+inObject+"\","+\
 80                                "\""+gdbName+"\","+\
 81                                "\""+seedName+str(r)+"\","+\
 82                                "\""+queryList[r]+"\")\n")
 83         else:
 84             scriptlines.append("result = gp.featureclasstofeatureclass_conversion("+\
 85                                "\""+inObject+"\","+\
 86                                "\""+gdbName+"\","+\
 87                                "\""+seedName+str(r)+"\","+\
 88                                "\""+queryList[r]+"\")\n")
 89         scriptLines.append("part = result.getoutput(0)\n")
 90         #
 91         #Begin code block for GP process of interest:
 92         scriptLines.append("aLocator = r\"D:\Dev\Python\GP_multicore\TestData.gdb\NZ Locator\""+"\n")
 93         #We could use a service.  Note the \\a in \\arcgis below would be a BELL literal if not escaped as in \a (rcgis)
 94         #scriptLines.append("aLocator = r\"GIS Servers\\arcgis on bruceh\CA_Locator.GeocodeServer\""+"\n")
 95         scriptLines.append("fieldMap = \"Address Address;Suburb Suburb;Mailtown Mailtown;Postcode Postcode\"\n")
 96         scriptLines.append("outFC = "+"\""+gdbName+"\\"+"Result"+str(r)+"\"\n")
 97         scriptLines.append("result = gp.geocodeaddresses_geocoding(part,aLocator,fieldMap,outFC)\n")
 98         scriptLines.append("result = result.getoutput(0)\n")
 99         #End code block for GP process of interest:
100         #
101         #Make stdio aware of completion - this message will be returned to the GP stdio message pipe
102         scriptLines.append("print "+ "\"Finished Part" + str(r) + "\"\n")
103         scriptLines.append("del gp\n")
104         #Write script
105         f = open(sName,'w')
106         f.writelines(scriptLines)
107         f.flush
108         f.close()
109     #Launch the parallel processes
110     gp.addmessage("Launching parallel processesGeoprocessing scripts 利用多核进行计算\n")
111     processList = []
112     for r in range(pCount):
113         processList.append(subprocess.Popen([r"C:\Python25\python.exe",scriptPathList[r]],\
114                                             shell=True,\
115                                             stdout=subprocess.PIPE,\
116                                             stderr=subprocess.PIPE))
117         gp.addmessage("Launched process " + str(r)+ "\n")
118         time.sleep(2)
119     #
120     #Wait for completion
121     messageList = []
122     Done = False
123     while not Done:
124         Done = True
125         for p in processList:
126             if p.poll() is None:
127                 Done = False
128             else:
129                 messageList.append("Process " + str(processList.index(p)) + " complete at " + str(time.ctime()) + "\n")
130             stdOutLines = p.stdout.readlines()
131             for s in stdOutLines:
132                 messageList.append(s)
133             stdErrorLines = p.stderr.readlines()
134             for e in stdErrorLines:
135                 messageList.append(e)
136             if not Done: #Save 2 seconds if completeGeoprocessing scripts 利用多核进行计算
137                 time.sleep(2)
138     for m in messageList:
139         gp.addmessage(m)
140     #Merge the results
141     outFC = gp.getparameterastext(2)
142     fm = gp.createobject("fieldmappings")
143     vt = gp.createobject("valuetable")
144     for g in gdbNameList:
145         gp.workspace = g
146         if len(gp.listfeatureclasses("Result*")) > 0:
147             fcDesc = gp.describe(gp.listfeatureclasses("Result*")[0])
148             fm.addtable(fcDesc.catalogpath)
149             vt.addrow(fcDesc.catalogpath)
150     gp.workspace = "in_memory"
151     result = gp.merge_management(vt,outFC,fm)
152     #Clean up
153     for g in gdbNameList:
154         result = gp.delete_management(g)
155     #Done!
156     
157     endTime = time.time()
158     gp.addmessage("Geoprocessing duration was " + str(int(endTime - startTime)) + " seconds.")
159     gp.addmessage("Net performance was " + \
160                   str(int(1.0 * count / (endTime - startTime) * 3600.0)) + " records per hour.")
161     del gp
162     
163 except:
164     tb = sys.exc_info()[2]
165     tbinfo = traceback.format_tb(tb)[0]
166     pymsg = "PYTHON ERRORS:\nTraceback Info:\n" + tbinfo + "\nError Info:\n    " + \
167             str(sys.exc_type)+ "" + str(sys.exc_value) + "\n"
168     gp.AddError(pymsg)
169     
170     msgs = "GP ERRORS:\n" + gp.GetMessages(2+ "\n"
171     gp.AddError(msgs)

相关文章:

  • 2021-12-08
  • 2021-08-18
  • 2022-12-23
  • 2021-11-22
  • 2022-12-23
  • 2021-10-19
  • 2021-05-24
  • 2022-12-23
猜你喜欢
  • 2021-12-04
  • 2022-12-23
  • 2021-11-06
  • 2021-12-04
  • 2021-05-03
  • 2021-09-10
相关资源
相似解决方案