Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import annotations

from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional


@dataclass(frozen=True)
class VariableBounds:
mass_tonne: tuple[float, float] = (5.0, 15.0)
centre_of_mass_z_m: tuple[float, float] = (-9.0, 3.0)
total_length_m: tuple[float, float] = (5.0, 7.0)
unit_damping_force: tuple[float, float] = (0.0, 300.0)
unit_damping_moment: tuple[float, float] = (0.0, 300.0)


@dataclass(frozen=True)
class DampingConfig:
# force_mode / moment_mode:
# - "same": normal=axial=normal_gene_value
# - "fixed_axial": normal=gene, axial=<*_axial_fixed>
# - "separate": normal=normal_gene, axial=axial_gene
force_mode: str = "same"
moment_mode: str = "same"
force_axial_fixed: float = 0.0
moment_axial_fixed: float = 0.0


@dataclass(frozen=True)
class GAConfig:
num_generations: int = 15
sol_per_pop: int = 16
num_parents_mating: int = 6
mutation_percent_genes: int = 20
random_seed: int = 42


@dataclass(frozen=True)
class ProjectConfig:
model_path: Path = Path(r"D:\计算结果\base.dat")
turret_name: str = "Turret"
line_name: str = "rise"
stress_result_name: str = "Von Mises Stress"
stress_allow_mpa: float = 360.0
# Penalty weight in MPa-space when violating allow stress.
penalty_weight: float = 1000.0

# Turret geometry constraints.
cylinder_count: int = 3

# For OrcaFlex output hygiene.
results_dir: Path = Path("results")
case_dir: Path = Path("results/cases")

# Optional fallback/output unit control.
assume_stress_is_pa: bool = True

bounds: VariableBounds = field(default_factory=VariableBounds)
damping: DampingConfig = field(default_factory=DampingConfig)
ga: GAConfig = field(default_factory=GAConfig)


CFG = ProjectConfig()


def build_gene_space(cfg: ProjectConfig) -> tuple[list[dict[str, float]], list[str]]:
"""Return pygad gene_space and ordered gene names based on damping mode."""
b = cfg.bounds
gene_space: list[dict[str, float]] = [
{"low": b.mass_tonne[0], "high": b.mass_tonne[1]},
{"low": b.centre_of_mass_z_m[0], "high": b.centre_of_mass_z_m[1]},
{"low": b.total_length_m[0], "high": b.total_length_m[1]},
{"low": b.unit_damping_force[0], "high": b.unit_damping_force[1]},
{"low": b.unit_damping_moment[0], "high": b.unit_damping_moment[1]},
]
gene_names = [
"mass_tonne",
"centre_of_mass_z_m",
"total_length_m",
"unit_damping_force_normal",
"unit_damping_moment_normal",
]

if cfg.damping.force_mode == "separate":
gene_space.append({"low": b.unit_damping_force[0], "high": b.unit_damping_force[1]})
gene_names.append("unit_damping_force_axial")

if cfg.damping.moment_mode == "separate":
gene_space.append({"low": b.unit_damping_moment[0], "high": b.unit_damping_moment[1]})
gene_names.append("unit_damping_moment_axial")

return gene_space, gene_names


def decode_solution(solution: list[float], gene_names: list[str], cfg: Optional[ProjectConfig] = None) -> dict[str, float]:
params = dict(zip(gene_names, map(float, solution)))
if cfg is None:
return params

force_normal = params["unit_damping_force_normal"]
if cfg.damping.force_mode == "same":
params["unit_damping_force_axial"] = force_normal
elif cfg.damping.force_mode == "fixed_axial":
params["unit_damping_force_axial"] = cfg.damping.force_axial_fixed

moment_normal = params["unit_damping_moment_normal"]
if cfg.damping.moment_mode == "same":
params["unit_damping_moment_axial"] = moment_normal
elif cfg.damping.moment_mode == "fixed_axial":
params["unit_damping_moment_axial"] = cfg.damping.moment_axial_fixed

return params
44 changes: 44 additions & 0 deletions metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from __future__ import annotations

from typing import Any


def _to_mpa(max_stress: float, assume_stress_is_pa: bool) -> float:
return max_stress / 1e6 if assume_stress_is_pa else max_stress


def extract_von_mises_max_mpa(
model: Any,
line_name: str,
result_name: str,
assume_stress_is_pa: bool = True,
) -> float:
"""Extract full-time-domain maximum Von Mises stress from an OrcaFlex line."""
line_obj = model[line_name]

# OrcaFlex result extraction signatures vary by object/result type,
# so we try common forms in a robust order.
history = None
extraction_errors: list[str] = []

candidate_calls = [
lambda: line_obj.TimeHistory(result_name),
lambda: line_obj.TimeHistory(result_name, "End A"),
lambda: line_obj.TimeHistory(result_name, "End B"),
]

for call in candidate_calls:
try:
history = call()
if history is not None:
break
except Exception as exc: # pragma: no cover - depends on OrcFxAPI runtime
extraction_errors.append(str(exc))

if history is None:
details = " | ".join(extraction_errors) if extraction_errors else "unknown failure"
raise RuntimeError(f"Unable to extract line time history for '{result_name}': {details}")

# history is typically a sequence-like object.
stress_max_raw = max(float(v) for v in history)
return _to_mpa(stress_max_raw, assume_stress_is_pa)
115 changes: 115 additions & 0 deletions orcaflex_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Any

from config import ProjectConfig
from metrics import extract_von_mises_max_mpa


@dataclass
class EvalResult:
stress_max_mpa: float
status: str
run_dir: Path


class OrcaFlexRunner:
def __init__(self, cfg: ProjectConfig) -> None:
self.cfg = cfg
self._orcfxapi = self._import_orcfxapi()

def _import_orcfxapi(self):
import OrcFxAPI # type: ignore

return OrcFxAPI

@staticmethod
def _set_attr(obj: Any, attr_names: list[str], value: Any) -> None:
for name in attr_names:
if hasattr(obj, name):
setattr(obj, name, value)
return
raise AttributeError(f"None of attributes {attr_names} exist on object type {type(obj).__name__}")

def _set_turret_geometry_equal_cylinders(self, turret: Any, total_length_m: float) -> None:
segment_length = total_length_m / self.cfg.cylinder_count

if not hasattr(turret, "Cylinders"):
raise AttributeError("Turret object has no 'Cylinders' member; check OrcaFlex object type/data schema.")

cylinders = turret.Cylinders

# Keep count fixed to configured value when possible.
if hasattr(cylinders, "Count"):
cylinders.Count = self.cfg.cylinder_count

for i in range(self.cfg.cylinder_count):
cyl = cylinders[i]
self._set_attr(cyl, ["Length", "CylinderLength"], segment_length)

def _set_turret_damping(self, turret: Any, params: dict[str, float]) -> None:
force_normal = params["unit_damping_force_normal"]
force_axial = params["unit_damping_force_axial"]
moment_normal = params["unit_damping_moment_normal"]
moment_axial = params["unit_damping_moment_axial"]

self._set_attr(
turret,
["UnitDampingForceNormal", "AddedMassAndDampingUnitDampingForceNormal"],
force_normal,
)
self._set_attr(
turret,
["UnitDampingForceAxial", "AddedMassAndDampingUnitDampingForceAxial"],
force_axial,
)
self._set_attr(
turret,
["UnitDampingMomentNormal", "AddedMassAndDampingUnitDampingMomentNormal"],
moment_normal,
)
self._set_attr(
turret,
["UnitDampingMomentAxial", "AddedMassAndDampingUnitDampingMomentAxial"],
moment_axial,
)

def evaluate(self, params: dict[str, float], run_dir: Path) -> EvalResult:
run_dir.mkdir(parents=True, exist_ok=True)
model = self._orcfxapi.Model()

try:
model.LoadData(str(self.cfg.model_path))

turret = model[self.cfg.turret_name]
self._set_attr(turret, ["Mass"], params["mass_tonne"])
self._set_attr(turret, ["CentreOfMassX"], 0.0)
self._set_attr(turret, ["CentreOfMassY"], 0.0)
self._set_attr(turret, ["CentreOfMassZ"], params["centre_of_mass_z_m"])

self._set_turret_geometry_equal_cylinders(turret, params["total_length_m"])
self._set_turret_damping(turret, params)

model.CalculateStatics()
model.RunSimulation()

stress_max_mpa = extract_von_mises_max_mpa(
model=model,
line_name=self.cfg.line_name,
result_name=self.cfg.stress_result_name,
assume_stress_is_pa=self.cfg.assume_stress_is_pa,
)

# Keep a snapshot per run.
if hasattr(model, "SaveSimulation"):
sim_path = run_dir / "case.sim"
model.SaveSimulation(str(sim_path))

return EvalResult(stress_max_mpa=stress_max_mpa, status="ok", run_dir=run_dir)
except Exception as exc:
return EvalResult(stress_max_mpa=float("inf"), status=f"error: {exc}", run_dir=run_dir)
finally:
# Best effort release.
del model
122 changes: 122 additions & 0 deletions run_ga.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from __future__ import annotations

import csv
import json
import math
from datetime import datetime
from pathlib import Path
from typing import Any

import pygad

from config import CFG, build_gene_space, decode_solution
from orcaflex_runner import OrcaFlexRunner


def penalty(stress_mpa: float, allow_mpa: float, penalty_weight: float) -> float:
if not math.isfinite(stress_mpa):
return 1e12
return penalty_weight * max(0.0, stress_mpa - allow_mpa)


def make_run_dir(base: Path, eval_idx: int) -> Path:
stamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
return base / f"eval_{eval_idx:05d}_{stamp}"


def main() -> None:
CFG.results_dir.mkdir(parents=True, exist_ok=True)
CFG.case_dir.mkdir(parents=True, exist_ok=True)

runner = OrcaFlexRunner(CFG)
gene_space, gene_names = build_gene_space(CFG)

eval_rows: list[dict[str, Any]] = []

def fitness_func(ga_instance: pygad.GA, solution: list[float], solution_idx: int) -> float:
params = decode_solution(solution, gene_names, CFG)
eval_idx = len(eval_rows)
run_dir = make_run_dir(CFG.case_dir, eval_idx)

result = runner.evaluate(params=params, run_dir=run_dir)
p = penalty(result.stress_max_mpa, CFG.stress_allow_mpa, CFG.penalty_weight)

# minimize stress => maximize inverse penalized score
if math.isfinite(result.stress_max_mpa):
fitness = 1.0 / (result.stress_max_mpa + p + 1e-9)
else:
fitness = 1e-12

row = {
"eval_idx": eval_idx,
"solution_idx": solution_idx,
**params,
"stress_max_mpa": result.stress_max_mpa,
"penalty": p,
"fitness": fitness,
"status": result.status,
"run_dir": str(result.run_dir),
}
eval_rows.append(row)
print(
f"[EVAL {eval_idx:04d}] stress={result.stress_max_mpa:.3f} MPa, "
f"penalty={p:.3f}, fitness={fitness:.6e}, status={result.status}"
)
return fitness

def on_generation(ga_instance: pygad.GA) -> None:
best_solution, best_fitness, _ = ga_instance.best_solution()
params = decode_solution(best_solution, gene_names, CFG)
# Inverse mapping to report approximate stress objective from fitness
approx_objective = (1.0 / best_fitness) if best_fitness > 0 else float("inf")
print(
f"[GEN {ga_instance.generations_completed:03d}] "
f"best_fitness={best_fitness:.6e}, approx_obj={approx_objective:.3f}, params={params}"
)

ga = pygad.GA(
num_generations=CFG.ga.num_generations,
sol_per_pop=CFG.ga.sol_per_pop,
num_parents_mating=CFG.ga.num_parents_mating,
num_genes=len(gene_space),
gene_space=gene_space,
mutation_percent_genes=CFG.ga.mutation_percent_genes,
fitness_func=fitness_func,
on_generation=on_generation,
random_seed=CFG.ga.random_seed,
suppress_warnings=True,
)

ga.run()

best_solution, best_fitness, _ = ga.best_solution()
best_params = decode_solution(best_solution, gene_names, CFG)

best_row = max(eval_rows, key=lambda r: r["fitness"]) if eval_rows else None

best_payload = {
"best_solution_vector": [float(x) for x in best_solution],
"best_params": best_params,
"best_fitness": float(best_fitness),
"best_eval_record": best_row,
"gene_names": gene_names,
}

with (CFG.results_dir / "best_solution.json").open("w", encoding="utf-8") as f:
json.dump(best_payload, f, ensure_ascii=False, indent=2)

if eval_rows:
fieldnames = sorted({k for row in eval_rows for k in row.keys()})
with (CFG.results_dir / "eval_log.csv").open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(eval_rows)

final_stress = best_row["stress_max_mpa"] if best_row else float("inf")
print("\n=== Optimization Finished ===")
print(f"Best parameters: {best_params}")
print(f"Best stress_max_MPa: {final_stress}")


if __name__ == "__main__":
main()