diff --git a/config.py b/config.py new file mode 100644 index 0000000..105b5b7 --- /dev/null +++ b/config.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + + +@dataclass(frozen=True) +class VariableBounds: + mass_tonne: tuple[float, float] = (5.0, 15.0) + centre_of_mass_z_m: tuple[float, float] = (-9.0, 3.0) + total_length_m: tuple[float, float] = (5.0, 7.0) + unit_damping_force: tuple[float, float] = (0.0, 300.0) + unit_damping_moment: tuple[float, float] = (0.0, 300.0) + + +@dataclass(frozen=True) +class DampingConfig: + # force_mode / moment_mode: + # - "same": normal=axial=normal_gene_value + # - "fixed_axial": normal=gene, axial=<*_axial_fixed> + # - "separate": normal=normal_gene, axial=axial_gene + force_mode: str = "same" + moment_mode: str = "same" + force_axial_fixed: float = 0.0 + moment_axial_fixed: float = 0.0 + + +@dataclass(frozen=True) +class GAConfig: + num_generations: int = 15 + sol_per_pop: int = 16 + num_parents_mating: int = 6 + mutation_percent_genes: int = 20 + random_seed: int = 42 + + +@dataclass(frozen=True) +class ProjectConfig: + model_path: Path = Path(r"D:\计算结果\base.dat") + turret_name: str = "Turret" + line_name: str = "rise" + stress_result_name: str = "Von Mises Stress" + stress_allow_mpa: float = 360.0 + # Penalty weight in MPa-space when violating allow stress. + penalty_weight: float = 1000.0 + + # Turret geometry constraints. + cylinder_count: int = 3 + + # For OrcaFlex output hygiene. + results_dir: Path = Path("results") + case_dir: Path = Path("results/cases") + + # Optional fallback/output unit control. + assume_stress_is_pa: bool = True + + bounds: VariableBounds = field(default_factory=VariableBounds) + damping: DampingConfig = field(default_factory=DampingConfig) + ga: GAConfig = field(default_factory=GAConfig) + + +CFG = ProjectConfig() + + +def build_gene_space(cfg: ProjectConfig) -> tuple[list[dict[str, float]], list[str]]: + """Return pygad gene_space and ordered gene names based on damping mode.""" + b = cfg.bounds + gene_space: list[dict[str, float]] = [ + {"low": b.mass_tonne[0], "high": b.mass_tonne[1]}, + {"low": b.centre_of_mass_z_m[0], "high": b.centre_of_mass_z_m[1]}, + {"low": b.total_length_m[0], "high": b.total_length_m[1]}, + {"low": b.unit_damping_force[0], "high": b.unit_damping_force[1]}, + {"low": b.unit_damping_moment[0], "high": b.unit_damping_moment[1]}, + ] + gene_names = [ + "mass_tonne", + "centre_of_mass_z_m", + "total_length_m", + "unit_damping_force_normal", + "unit_damping_moment_normal", + ] + + if cfg.damping.force_mode == "separate": + gene_space.append({"low": b.unit_damping_force[0], "high": b.unit_damping_force[1]}) + gene_names.append("unit_damping_force_axial") + + if cfg.damping.moment_mode == "separate": + gene_space.append({"low": b.unit_damping_moment[0], "high": b.unit_damping_moment[1]}) + gene_names.append("unit_damping_moment_axial") + + return gene_space, gene_names + + +def decode_solution(solution: list[float], gene_names: list[str], cfg: Optional[ProjectConfig] = None) -> dict[str, float]: + params = dict(zip(gene_names, map(float, solution))) + if cfg is None: + return params + + force_normal = params["unit_damping_force_normal"] + if cfg.damping.force_mode == "same": + params["unit_damping_force_axial"] = force_normal + elif cfg.damping.force_mode == "fixed_axial": + params["unit_damping_force_axial"] = cfg.damping.force_axial_fixed + + moment_normal = params["unit_damping_moment_normal"] + if cfg.damping.moment_mode == "same": + params["unit_damping_moment_axial"] = moment_normal + elif cfg.damping.moment_mode == "fixed_axial": + params["unit_damping_moment_axial"] = cfg.damping.moment_axial_fixed + + return params diff --git a/metrics.py b/metrics.py new file mode 100644 index 0000000..3d7415b --- /dev/null +++ b/metrics.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from typing import Any + + +def _to_mpa(max_stress: float, assume_stress_is_pa: bool) -> float: + return max_stress / 1e6 if assume_stress_is_pa else max_stress + + +def extract_von_mises_max_mpa( + model: Any, + line_name: str, + result_name: str, + assume_stress_is_pa: bool = True, +) -> float: + """Extract full-time-domain maximum Von Mises stress from an OrcaFlex line.""" + line_obj = model[line_name] + + # OrcaFlex result extraction signatures vary by object/result type, + # so we try common forms in a robust order. + history = None + extraction_errors: list[str] = [] + + candidate_calls = [ + lambda: line_obj.TimeHistory(result_name), + lambda: line_obj.TimeHistory(result_name, "End A"), + lambda: line_obj.TimeHistory(result_name, "End B"), + ] + + for call in candidate_calls: + try: + history = call() + if history is not None: + break + except Exception as exc: # pragma: no cover - depends on OrcFxAPI runtime + extraction_errors.append(str(exc)) + + if history is None: + details = " | ".join(extraction_errors) if extraction_errors else "unknown failure" + raise RuntimeError(f"Unable to extract line time history for '{result_name}': {details}") + + # history is typically a sequence-like object. + stress_max_raw = max(float(v) for v in history) + return _to_mpa(stress_max_raw, assume_stress_is_pa) diff --git a/orcaflex_runner.py b/orcaflex_runner.py new file mode 100644 index 0000000..bfed506 --- /dev/null +++ b/orcaflex_runner.py @@ -0,0 +1,115 @@ +from __future__ import annotations + +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +from config import ProjectConfig +from metrics import extract_von_mises_max_mpa + + +@dataclass +class EvalResult: + stress_max_mpa: float + status: str + run_dir: Path + + +class OrcaFlexRunner: + def __init__(self, cfg: ProjectConfig) -> None: + self.cfg = cfg + self._orcfxapi = self._import_orcfxapi() + + def _import_orcfxapi(self): + import OrcFxAPI # type: ignore + + return OrcFxAPI + + @staticmethod + def _set_attr(obj: Any, attr_names: list[str], value: Any) -> None: + for name in attr_names: + if hasattr(obj, name): + setattr(obj, name, value) + return + raise AttributeError(f"None of attributes {attr_names} exist on object type {type(obj).__name__}") + + def _set_turret_geometry_equal_cylinders(self, turret: Any, total_length_m: float) -> None: + segment_length = total_length_m / self.cfg.cylinder_count + + if not hasattr(turret, "Cylinders"): + raise AttributeError("Turret object has no 'Cylinders' member; check OrcaFlex object type/data schema.") + + cylinders = turret.Cylinders + + # Keep count fixed to configured value when possible. + if hasattr(cylinders, "Count"): + cylinders.Count = self.cfg.cylinder_count + + for i in range(self.cfg.cylinder_count): + cyl = cylinders[i] + self._set_attr(cyl, ["Length", "CylinderLength"], segment_length) + + def _set_turret_damping(self, turret: Any, params: dict[str, float]) -> None: + force_normal = params["unit_damping_force_normal"] + force_axial = params["unit_damping_force_axial"] + moment_normal = params["unit_damping_moment_normal"] + moment_axial = params["unit_damping_moment_axial"] + + self._set_attr( + turret, + ["UnitDampingForceNormal", "AddedMassAndDampingUnitDampingForceNormal"], + force_normal, + ) + self._set_attr( + turret, + ["UnitDampingForceAxial", "AddedMassAndDampingUnitDampingForceAxial"], + force_axial, + ) + self._set_attr( + turret, + ["UnitDampingMomentNormal", "AddedMassAndDampingUnitDampingMomentNormal"], + moment_normal, + ) + self._set_attr( + turret, + ["UnitDampingMomentAxial", "AddedMassAndDampingUnitDampingMomentAxial"], + moment_axial, + ) + + def evaluate(self, params: dict[str, float], run_dir: Path) -> EvalResult: + run_dir.mkdir(parents=True, exist_ok=True) + model = self._orcfxapi.Model() + + try: + model.LoadData(str(self.cfg.model_path)) + + turret = model[self.cfg.turret_name] + self._set_attr(turret, ["Mass"], params["mass_tonne"]) + self._set_attr(turret, ["CentreOfMassX"], 0.0) + self._set_attr(turret, ["CentreOfMassY"], 0.0) + self._set_attr(turret, ["CentreOfMassZ"], params["centre_of_mass_z_m"]) + + self._set_turret_geometry_equal_cylinders(turret, params["total_length_m"]) + self._set_turret_damping(turret, params) + + model.CalculateStatics() + model.RunSimulation() + + stress_max_mpa = extract_von_mises_max_mpa( + model=model, + line_name=self.cfg.line_name, + result_name=self.cfg.stress_result_name, + assume_stress_is_pa=self.cfg.assume_stress_is_pa, + ) + + # Keep a snapshot per run. + if hasattr(model, "SaveSimulation"): + sim_path = run_dir / "case.sim" + model.SaveSimulation(str(sim_path)) + + return EvalResult(stress_max_mpa=stress_max_mpa, status="ok", run_dir=run_dir) + except Exception as exc: + return EvalResult(stress_max_mpa=float("inf"), status=f"error: {exc}", run_dir=run_dir) + finally: + # Best effort release. + del model diff --git a/run_ga.py b/run_ga.py new file mode 100644 index 0000000..8e1d2d1 --- /dev/null +++ b/run_ga.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import csv +import json +import math +from datetime import datetime +from pathlib import Path +from typing import Any + +import pygad + +from config import CFG, build_gene_space, decode_solution +from orcaflex_runner import OrcaFlexRunner + + +def penalty(stress_mpa: float, allow_mpa: float, penalty_weight: float) -> float: + if not math.isfinite(stress_mpa): + return 1e12 + return penalty_weight * max(0.0, stress_mpa - allow_mpa) + + +def make_run_dir(base: Path, eval_idx: int) -> Path: + stamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") + return base / f"eval_{eval_idx:05d}_{stamp}" + + +def main() -> None: + CFG.results_dir.mkdir(parents=True, exist_ok=True) + CFG.case_dir.mkdir(parents=True, exist_ok=True) + + runner = OrcaFlexRunner(CFG) + gene_space, gene_names = build_gene_space(CFG) + + eval_rows: list[dict[str, Any]] = [] + + def fitness_func(ga_instance: pygad.GA, solution: list[float], solution_idx: int) -> float: + params = decode_solution(solution, gene_names, CFG) + eval_idx = len(eval_rows) + run_dir = make_run_dir(CFG.case_dir, eval_idx) + + result = runner.evaluate(params=params, run_dir=run_dir) + p = penalty(result.stress_max_mpa, CFG.stress_allow_mpa, CFG.penalty_weight) + + # minimize stress => maximize inverse penalized score + if math.isfinite(result.stress_max_mpa): + fitness = 1.0 / (result.stress_max_mpa + p + 1e-9) + else: + fitness = 1e-12 + + row = { + "eval_idx": eval_idx, + "solution_idx": solution_idx, + **params, + "stress_max_mpa": result.stress_max_mpa, + "penalty": p, + "fitness": fitness, + "status": result.status, + "run_dir": str(result.run_dir), + } + eval_rows.append(row) + print( + f"[EVAL {eval_idx:04d}] stress={result.stress_max_mpa:.3f} MPa, " + f"penalty={p:.3f}, fitness={fitness:.6e}, status={result.status}" + ) + return fitness + + def on_generation(ga_instance: pygad.GA) -> None: + best_solution, best_fitness, _ = ga_instance.best_solution() + params = decode_solution(best_solution, gene_names, CFG) + # Inverse mapping to report approximate stress objective from fitness + approx_objective = (1.0 / best_fitness) if best_fitness > 0 else float("inf") + print( + f"[GEN {ga_instance.generations_completed:03d}] " + f"best_fitness={best_fitness:.6e}, approx_obj={approx_objective:.3f}, params={params}" + ) + + ga = pygad.GA( + num_generations=CFG.ga.num_generations, + sol_per_pop=CFG.ga.sol_per_pop, + num_parents_mating=CFG.ga.num_parents_mating, + num_genes=len(gene_space), + gene_space=gene_space, + mutation_percent_genes=CFG.ga.mutation_percent_genes, + fitness_func=fitness_func, + on_generation=on_generation, + random_seed=CFG.ga.random_seed, + suppress_warnings=True, + ) + + ga.run() + + best_solution, best_fitness, _ = ga.best_solution() + best_params = decode_solution(best_solution, gene_names, CFG) + + best_row = max(eval_rows, key=lambda r: r["fitness"]) if eval_rows else None + + best_payload = { + "best_solution_vector": [float(x) for x in best_solution], + "best_params": best_params, + "best_fitness": float(best_fitness), + "best_eval_record": best_row, + "gene_names": gene_names, + } + + with (CFG.results_dir / "best_solution.json").open("w", encoding="utf-8") as f: + json.dump(best_payload, f, ensure_ascii=False, indent=2) + + if eval_rows: + fieldnames = sorted({k for row in eval_rows for k in row.keys()}) + with (CFG.results_dir / "eval_log.csv").open("w", newline="", encoding="utf-8") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + writer.writerows(eval_rows) + + final_stress = best_row["stress_max_mpa"] if best_row else float("inf") + print("\n=== Optimization Finished ===") + print(f"Best parameters: {best_params}") + print(f"Best stress_max_MPa: {final_stress}") + + +if __name__ == "__main__": + main()