import arcpy
import os
import math
import uuid
def perpendicular_distance(point, start, end):
if start.equals(end):
return math.sqrt((point.X - start.X) ** 2 + (point.Y - start.Y) ** 2)
abx = end.X - start.X
aby = end.Y - start.Y
apx = point.X - start.X
apy = point.Y - start.Y
ab2 = abx * abx + aby * aby
if ab2 == 0:
return math.sqrt(apx * apx + apy * apy)
t = (apx * abx + apy * aby) / ab2
t = max(0, min(1, t))
proj_x = start.X + t * abx
proj_y = start.Y + t * aby
dx = point.X - proj_x
dy = point.Y - proj_y
return math.sqrt(dx * dx + dy * dy)
def douglas_peucker_keep(points, tolerance, keep_set):
if len(points) <= 2:
return points[:]
def point_key(pt):
return (round(pt.X, 9), round(pt.Y, 9))
has_keep = any(point_key(p) in keep_set for p in points[1:-1])
if not has_keep:
max_dist = 0
index = -1
for i in range(1, len(points) - 1):
dist = perpendicular_distance(points[i], points[0], points[-1])
if dist > max_dist:
max_dist = dist
index = i
if max_dist > tolerance:
left = douglas_peucker_keep(points[:index+1], tolerance, keep_set)
right = douglas_peucker_keep(points[index:], tolerance, keep_set)
return left[:-1] + right
else:
return [points[0], points[-1]]
result = [points[0]]
i = 0
n = len(points)
while i < n - 1:
j = i + 1
next_keep = None
while j < n:
if point_key(points[j]) in keep_set or j == n - 1:
next_keep = j
break
j += 1
if next_keep is None:
next_keep = n - 1
segment = points[i:next_keep+1]
if len(segment) <= 2:
if len(segment) == 2 and not segment[1].equals(result[-1]):
result.append(segment[1])
else:
simplified_seg = douglas_peucker_keep(segment, tolerance, keep_set)
if simplified_seg and not simplified_seg[0].equals(result[-1]):
result.extend(simplified_seg)
elif simplified_seg:
result.extend(simplified_seg[1:])
i = next_keep
cleaned = [result[0]]
for pt in result[1:]:
if not pt.equals(cleaned[-1]):
cleaned.append(pt)
return cleaned
def extract_rings_from_part(part):
rings = []
current_ring = []
for pt in part:
if pt is None:
if len(current_ring) >= 3:
if not current_ring[0].equals(current_ring[-1]):
current_ring.append(arcpy.Point(current_ring[0].X, current_ring[0].Y))
rings.append(current_ring)
current_ring = []
else:
current_ring.append(pt)
if len(current_ring) >= 3:
if not current_ring[0].equals(current_ring[-1]):
current_ring.append(arcpy.Point(current_ring[0].X, current_ring[0].Y))
rings.append(current_ring)
return rings
def is_clockwise(points):
if len(points) < 3:
return False
area = 0.0
n = len(points)
for i in range(n):
j = (i + 1) % n
area += points[i].X * points[j].Y
area -= points[j].X * points[i].Y
return area < 0
def rotate_ring_to_northwest_start(ring):
if len(ring) <= 1:
return ring
best_index = 0
best_y = ring[0].Y
best_x = ring[0].X
for i in range(1, len(ring)):
pt = ring[i]
if pt.Y > best_y or (pt.Y == best_y and pt.X < best_x):
best_y = pt.Y
best_x = pt.X
best_index = i
return ring[best_index:] + ring[:best_index]
class Toolbox(object):
def __init__(self):
self.label = "宗地边界处理工具(支持全数/抽稀输出 + 交接点保护)"
self.alias = "ParcelBoundaryWithJunctionPreserve"
self.tools = [GenerateJZDandJZX]
class GenerateJZDandJZX(object):
def __init__(self):
self.label = "生成界址点(JZD)与界址线(JZX)"
self.description = (
"功能:\n"
"- 全数输出:输出所有原始折点\n"
"- 抽稀输出:从原始边界中提取关键折点(不重建面),但地块交接点强制保留\n"
"- 外环为顺时针(CW),内环为逆时针(CCW)\n"
"- 界址点从西北角(最北且最西)开始编号\n"
"- 外环编号 J1,J2...(每宗地独立)\n"
"- 内环编号 K1,K2...(每宗地独立)\n"
"- 每个宗地内部自动删除空间重复界址点\n"
"- LOCAL_ID:宗地内编号(用于宗地图)\n"
"- GLOBAL_ID:全局唯一流水号 P0001...(用于拓扑)\n"
"- 界址线 START_ID/END_ID 使用 GLOBAL_ID"
)
self.category = "地籍管理"
def getParameterInfo(self):
input_parcel = arcpy.Parameter(
displayName="输入宗地面图层",
name="input_parcel",
datatype="GPFeatureLayer",
parameterType="Required",
direction="Input"
)
input_parcel.filter.list = ["Polygon"]
output_mode = arcpy.Parameter(
displayName="界址点输出模式",
name="output_mode",
datatype="GPString",
parameterType="Required",
direction="Input"
)
output_mode.filter.type = "ValueList"
output_mode.filter.list = ["全数输出", "抽稀输出"]
output_mode.value = "全数输出"
tolerance = arcpy.Parameter(
displayName="简化容差(米)",
name="tolerance",
datatype="GPDouble",
parameterType="Optional",
direction="Input"
)
tolerance.value = 5.0
tolerance.enabled = False
use_inner_prefix = arcpy.Parameter(
displayName="内环界址点使用独立前缀(如 K1, K2...)",
name="use_inner_prefix",
datatype="GPBoolean",
parameterType="Optional",
direction="Input"
)
use_inner_prefix.value = True
output_jzd = arcpy.Parameter(
displayName="输出界址点",
name="output_jzd",
datatype="DEFeatureClass",
parameterType="Required",
direction="Output"
)
output_jzx = arcpy.Parameter(
displayName="输出界址线",
name="output_jzx",
datatype="DEFeatureClass",
parameterType="Required",
direction="Output"
)
return [input_parcel, output_mode, tolerance, use_inner_prefix, output_jzd, output_jzx]
def updateParameters(self, parameters):
if parameters[1].value == "抽稀输出":
parameters[2].enabled = True
else:
parameters[2].enabled = False
return
def execute(self, parameters, messages):
input_parcel = parameters[0].valueAsText
output_mode = parameters[1].valueAsText
tolerance = float(parameters[2].value) if parameters[2].value else 5.0
use_inner_prefix = bool(parameters[3].value)
output_jzd = parameters[4].valueAsText
output_jzx = parameters[5].valueAsText
desc = arcpy.Describe(input_parcel)
sr = desc.spatialReference
field_names_upper = [f.name.upper() for f in arcpy.ListFields(input_parcel)]
if "ZDDM" not in field_names_upper:
arcpy.AddError("输入图层缺少必需字段:ZDDM")
return
ZDDM_field = [f.name for f in arcpy.ListFields(input_parcel) if f.name.upper() == "ZDDM"][0]
temp_ws = arcpy.env.scratchGDB
uniq = str(uuid.uuid4().hex)[:6]
total_parcels = int(arcpy.GetCount_management(input_parcel)[0])
arcpy.AddMessage(f"📊 输入宗地总数:{total_parcels} 个")
arcpy.SetProgressor("step", "正在复制并修复宗地几何...", 0, 6, 1)
raw_parcels = os.path.join(temp_ws, f"parcels_{uniq}")
arcpy.management.CopyFeatures(input_parcel, raw_parcels)
arcpy.management.RepairGeometry(raw_parcels, "DELETE_NULL")
arcpy.SetProgressorPosition()
arcpy.SetProgressorLabel("正在提取宗地边界线...")
original_boundary = os.path.join(temp_ws, f"original_boundary_{uniq}")
arcpy.management.PolygonToLine(raw_parcels, original_boundary, "IDENTIFY_NEIGHBORS")
arcpy.SetProgressorPosition()
arcpy.SetProgressorLabel("正在识别地块交接点...")
junction_points = set()
with arcpy.da.SearchCursor(original_boundary, ["SHAPE@", "LEFT_FID", "RIGHT_FID"]) as cur:
for line_geom, left_fid, right_fid in cur:
if line_geom and left_fid != -1 and right_fid != -1 and left_fid != right_fid:
start_key = (round(line_geom.firstPoint.X, 9), round(line_geom.firstPoint.Y, 9))
end_key = (round(line_geom.lastPoint.X, 9), round(line_geom.lastPoint.Y, 9))
junction_points.add(start_key)
junction_points.add(end_key)
arcpy.AddMessage(f"📍 检测到 {len(junction_points)} 个地块交接点(抽稀时将强制保留)")
arcpy.SetProgressorPosition()
if output_mode == "全数输出":
arcpy.AddMessage("📌 使用原始宗地边界(全数输出界址点)")
parcel_key_points = {}
all_key_points_set = set()
with arcpy.da.SearchCursor(raw_parcels, ["SHAPE@", ZDDM_field]) as s_cur:
for geom, ZDDM_val in s_cur:
if geom is None or geom.area <= 0:
continue
ZDDM_val = str(ZDDM_val).strip() if ZDDM_val else ""
key_pts_for_parcel = []
for part in geom:
rings = extract_rings_from_part(part)
if not rings:
continue
outer_ring = rings[0]
cleaned_outer = [p for i, p in enumerate(outer_ring[:-1]) if i == 0 or not p.equals(outer_ring[i-1])]
for pt in cleaned_outer:
key = (round(pt.X, 9), round(pt.Y, 9))
all_key_points_set.add(key)
key_pts_for_parcel.append((pt, 'outer', 0))
for idx, ring in enumerate(rings[1:], start=1):
cleaned_inner = [p for i, p in enumerate(ring[:-1]) if i == 0 or not p.equals(ring[i-1])]
for pt in cleaned_inner:
key = (round(pt.X, 9), round(pt.Y, 9))
all_key_points_set.add(key)
key_pts_for_parcel.append((pt, 'inner', idx))
parcel_key_points[ZDDM_val] = key_pts_for_parcel
else:
arcpy.AddMessage(f"🔍 正在从原始边界中提取关键折点(容差={tolerance}米,不重建面)...")
parcel_key_points = {}
all_key_points_set = set()
with arcpy.da.SearchCursor(raw_parcels, ["SHAPE@", ZDDM_field]) as s_cur:
for geom, ZDDM_val in s_cur:
if geom is None or geom.area <= 0:
continue
ZDDM_val = str(ZDDM_val).strip() if ZDDM_val else ""
key_pts_for_parcel = []
for part in geom:
rings = extract_rings_from_part(part)
if not rings:
continue
outer_ring = rings[0]
cleaned_outer = [p for i, p in enumerate(outer_ring[:-1]) if i == 0 or not p.equals(outer_ring[i-1])]
if len(cleaned_outer) >= 2:
simplified_outer = douglas_peucker_keep(cleaned_outer, tolerance, junction_points)
for pt in simplified_outer:
key = (round(pt.X, 9), round(pt.Y, 9))
all_key_points_set.add(key)
key_pts_for_parcel.append((pt, 'outer', 0))
for idx, ring in enumerate(rings[1:], start=1):
cleaned_inner = [p for i, p in enumerate(ring[:-1]) if i == 0 or not p.equals(ring[i-1])]
if len(cleaned_inner) >= 2:
simplified_inner = douglas_peucker_keep(cleaned_inner, tolerance, junction_points)
for pt in simplified_inner:
key = (round(pt.X, 9), round(pt.Y, 9))
all_key_points_set.add(key)
key_pts_for_parcel.append((pt, 'inner', idx))
parcel_key_points[ZDDM_val] = key_pts_for_parcel
arcpy.SetProgressorLabel("正在创建关键界址点图层...")
key_points_fc = os.path.join(temp_ws, f"key_points_{uniq}")
arcpy.management.CreateFeatureclass(temp_ws, f"key_points_{uniq}", "POINT", spatial_reference=sr)
with arcpy.da.InsertCursor(key_points_fc, ["SHAPE@"]) as cur:
for x, y in all_key_points_set:
cur.insertRow([arcpy.PointGeometry(arcpy.Point(x, y), sr)])
arcpy.SetProgressorPosition()
arcpy.SetProgressorLabel("正在分配界址点编号(LOCAL_ID + GLOBAL_ID)...")
coord_to_global = {}
global_counter = 1
all_coords = set()
for pts_info in parcel_key_points.values():
for pt, _, _ in pts_info:
key = (round(pt.X, 9), round(pt.Y, 9))
all_coords.add(key)
for key in sorted(all_coords):
coord_to_global[key] = f"P{global_counter:04d}"
global_counter += 1
coord_to_local = {}
for ZDDM_val, pts_info in parcel_key_points.items():
outer_pts = [pt for pt, typ, _ in pts_info if typ == 'outer']
inner_by_ring = {}
for pt, typ, ring_idx in pts_info:
if typ == 'inner':
inner_by_ring.setdefault(ring_idx, []).append(pt)
if outer_pts:
outer_rotated = rotate_ring_to_northwest_start(outer_pts)
if not is_clockwise(outer_rotated):
outer_rotated.reverse()
for i, pt in enumerate(outer_rotated):
key = (round(pt.X, 9), round(pt.Y, 9))
coord_to_local[(ZDDM_val, key)] = f"J{i+1}"
inner_counter = 0
for ring_idx in sorted(inner_by_ring.keys()):
ring_pts = inner_by_ring[ring_idx]
ring_rotated = rotate_ring_to_northwest_start(ring_pts)
if is_clockwise(ring_rotated):
ring_rotated.reverse()
for pt in ring_rotated:
inner_counter += 1
key = (round(pt.X, 9), round(pt.Y, 9))
coord_to_local[(ZDDM_val, key)] = f"K{inner_counter}"
jzd_temp = os.path.join(temp_ws, f"jzd_{uniq}")
arcpy.management.CreateFeatureclass(temp_ws, f"jzd_{uniq}", "POINT", spatial_reference=sr)
arcpy.management.AddField(jzd_temp, "LOCAL_ID", "TEXT", field_length=20)
arcpy.management.AddField(jzd_temp, "GLOBAL_ID", "TEXT", field_length=20)
arcpy.management.AddField(jzd_temp, "X", "DOUBLE")
arcpy.management.AddField(jzd_temp, "Y", "DOUBLE")
arcpy.management.AddField(jzd_temp, ZDDM_field, "TEXT", field_length=255)
inserted_check = set()
with arcpy.da.InsertCursor(jzd_temp, ["SHAPE@", "LOCAL_ID", "GLOBAL_ID", "X", "Y", ZDDM_field]) as icur:
for ZDDM_val, pts_info in parcel_key_points.items():
for pt, _, _ in pts_info:
coord_key = (round(pt.X, 9), round(pt.Y, 9))
local_key = (ZDDM_val, coord_key)
if local_key in inserted_check:
continue
if coord_key in coord_to_global and local_key in coord_to_local:
local_id = coord_to_local[local_key]
global_id = coord_to_global[coord_key]
icur.insertRow([
arcpy.PointGeometry(pt, sr),
local_id,
global_id,
pt.X,
pt.Y,
ZDDM_val
])
inserted_check.add(local_key)
jzd_total = int(arcpy.GetCount_management(jzd_temp)[0])
arcpy.AddMessage(f"✅ 共生成界址点:{jzd_total} 个(LOCAL_ID: J/K;GLOBAL_ID: P0001...)")
arcpy.SetProgressorPosition()
arcpy.SetProgressorLabel("正在生成界址线(关联 GLOBAL_ID)...")
jzx_split = os.path.join(temp_ws, f"jzx_split_{uniq}")
arcpy.management.SplitLineAtPoint(original_boundary, key_points_fc, jzx_split, "0.1 Meters")
fid_to_ZDDM = {}
with arcpy.da.SearchCursor(raw_parcels, ["OID@", ZDDM_field]) as cur:
for fid, val in cur:
fid_to_ZDDM[fid] = str(val).strip() if val else ""
jzx_final = os.path.join(temp_ws, f"jzx_final_{uniq}")
arcpy.management.CreateFeatureclass(temp_ws, f"jzx_final_{uniq}", "POLYLINE", spatial_reference=sr)
arcpy.management.AddField(jzx_final, "START_ID", "TEXT", field_length=20)
arcpy.management.AddField(jzx_final, "END_ID", "TEXT", field_length=20)
arcpy.management.AddField(jzx_final, ZDDM_field, "TEXT", field_length=255)
def get_global_id(point):
key = (round(point.X, 9), round(point.Y, 9))
return coord_to_global.get(key, "")
jzx_count = 0
with arcpy.da.InsertCursor(jzx_final, ["SHAPE@", "START_ID", "END_ID", ZDDM_field]) as i_cur:
with arcpy.da.SearchCursor(jzx_split, ["SHAPE@", "LEFT_FID", "RIGHT_FID"]) as l_cur:
for line_geom, left_fid, right_fid in l_cur:
if not line_geom or line_geom.length <= 0:
continue
start_gid = get_global_id(line_geom.firstPoint)
end_gid = get_global_id(line_geom.lastPoint)
if not (start_gid and end_gid):
continue
zddms = []
if left_fid != -1 and left_fid in fid_to_ZDDM:
z = fid_to_ZDDM[left_fid]
if z:
zddms.append(z)
if right_fid != -1 and right_fid in fid_to_ZDDM:
z = fid_to_ZDDM[right_fid]
if z and z not in zddms:
zddms.append(z)
final_zddm = "\\".join(zddms) if zddms else ""
i_cur.insertRow([line_geom, start_gid, end_gid, final_zddm])
jzx_count += 1
arcpy.AddMessage(f"✅ 共生成界址线:{jzx_count} 条(START_ID/END_ID 使用 GLOBAL_ID)")
arcpy.SetProgressorPosition()
arcpy.SetProgressorLabel("正在写入最终结果...")
arcpy.management.CopyFeatures(jzd_temp, output_jzd)
arcpy.management.CopyFeatures(jzx_final, output_jzx)
arcpy.ResetProgressor()
arcpy.AddMessage("🎉 界址点与界址线生成完成!")
arcpy.AddMessage(f" 📍 界址点:{output_jzd} (共 {jzd_total} 个)")
arcpy.AddMessage(f" 📏 界址线:{output_jzx} (共 {jzx_count} 条)")
if output_mode == "抽稀输出":
arcpy.AddMessage(f" ⚙️ 已启用抽稀(容差={tolerance}米),{len(junction_points)} 个交接点已强制保留。")
else:
arcpy.AddMessage(" 📌 全数输出原始界址点。")