#!/usr/bin/env python3
"""
Pre-registered secondary analysis: is marcescent (standing-dead) plant litter
chemically more recalcitrant than shed litter, and does it host a more
fungal-dominated decomposer community? A mechanistic companion to the finding
that marcescent litter decomposes more slowly.

Automated executor (arm B). Self-contained: downloads the pinned file from
Figshare, verifies md5, runs the pre-registered tests, writes every figure,
table, and results.json. Re-running reproduces every number.

Dataset: Figshare 10.6084/m9.figshare.25062800 (CC-BY-4.0),
         "Angst et al. 2024 Functional Ecology raw data",
         file "DataMarcescencedecomposition fin.xlsx" (md5 0c66ac4ef3d82ef09ed2c0332a4796b6).
         Sheets used: FTIR (litter chemistry), PLFA (microbial biomass).

Pre-registered tests (run exactly these):
  T1  polyphenolics (recalcitrant) marcescent vs shed, paired by species : Wilcoxon signed-rank
  T2  polysaccharides (labile)     marcescent vs shed, paired by species : Wilcoxon signed-rank
  T3  fungi:bacteria ratio         marcescent vs shed, paired by species : Wilcoxon signed-rank
  Multiplicity across {T1, T2, T3}: Benjamini-Hochberg (FDR) and Holm.
"""
import hashlib
import io
import json
import os
import urllib.request

import matplotlib

matplotlib.use("Agg")
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from scipy import stats

np.random.seed(0)

EXPECTED_MD5 = "0c66ac4ef3d82ef09ed2c0332a4796b6"
URL = "https://ndownloader.figshare.com/files/44230574"
OUT = os.path.dirname(os.path.abspath(__file__))
FIG = os.path.join(OUT, "figures")
TAB = os.path.join(OUT, "tables")
os.makedirs(FIG, exist_ok=True)
os.makedirs(TAB, exist_ok=True)
CB = {"M": "#0072B2", "S": "#E69F00"}


def fetch():
    raw = urllib.request.urlopen(URL, timeout=120).read()
    md5 = hashlib.md5(raw).hexdigest()
    if md5 != EXPECTED_MD5:
        raise SystemExit(f"md5 mismatch: {md5}")
    return raw, md5


def bh_holm(p):
    p = np.asarray(p, float); m = len(p); order = np.argsort(p)
    bh = np.empty(m); prev = 1.0
    for rank, idx in enumerate(order[::-1]):
        prev = min(prev, p[idx] * m / (m - rank)); bh[idx] = min(prev, 1.0)
    holm = np.empty(m); prev = 0.0
    for rank, idx in enumerate(order):
        prev = max(prev, (m - rank) * p[idx]); holm[idx] = min(prev, 1.0)
    return bh.tolist(), holm.tolist()


def paired_by_species(df, value):
    """Mean per species per litter type, keep species with both M and S, wide."""
    g = df.groupby(["Species", "Litter"])[value].mean().reset_index()
    wide = g.pivot_table(index="Species", columns="Litter", values=value).dropna(subset=["M", "S"])
    return wide


def wilcoxon_block(wide, label):
    w = stats.wilcoxon(wide["M"], wide["S"], method="approx")
    r = abs(w.zstatistic) / np.sqrt(len(wide))
    return {"test": f"Wilcoxon signed-rank ({label}, marcescent vs shed)",
            "W": float(w.statistic), "z": float(w.zstatistic), "p": float(w.pvalue),
            "rank_biserial_r": float(r), "n_species": int(len(wide)),
            "median_M": float(wide["M"].median()), "median_S": float(wide["S"].median()),
            "mean_M": float(wide["M"].mean()), "mean_S": float(wide["S"].mean())}


def main():
    raw, md5 = fetch()
    ftir = pd.read_excel(io.BytesIO(raw), sheet_name="FTIR")
    ftir = ftir.rename(columns={"polyphenolics [%]": "polyphenolics", "polysaccharides [%]": "polysaccharides"})
    ftir = ftir[ftir["Litter"].isin(["M", "S"])].dropna(subset=["polyphenolics", "polysaccharides"])

    plfa = pd.read_excel(io.BytesIO(raw), sheet_name="PLFA")
    plfa = plfa.rename(columns={"fungi [nmol/g DW]": "fungi", "bacteria [nmol/g DW]": "bacteria"})
    plfa = plfa[plfa["Litter"].isin(["M", "S"])].dropna(subset=["fungi", "bacteria"])
    plfa = plfa[plfa["bacteria"] > 0].copy()
    plfa["fb_ratio"] = plfa["fungi"] / plfa["bacteria"]

    results = {"dataset": {"figshare_id": "25062800", "md5": md5}}

    poly_w = paired_by_species(ftir, "polyphenolics")
    sacc_w = paired_by_species(ftir, "polysaccharides")
    fb_w = paired_by_species(plfa, "fb_ratio")
    results["T1"] = wilcoxon_block(poly_w, "polyphenolics %")
    results["T2"] = wilcoxon_block(sacc_w, "polysaccharides %")
    results["T3"] = wilcoxon_block(fb_w, "fungi:bacteria ratio")

    fam = [("T1", results["T1"]["p"]), ("T2", results["T2"]["p"]), ("T3", results["T3"]["p"])]
    bh, holm = bh_holm([p for _, p in fam])
    results["multiplicity"] = {n: {"raw_p": p, "bh_p": bh[i], "holm_p": holm[i]} for i, (n, p) in enumerate(fam)}

    with open(os.path.join(OUT, "results.json"), "w") as fh:
        json.dump(results, fh, indent=2)

    # ---- figures (paired M vs S) ----
    def paired_fig(wide, ylabel, title, fname, res):
        fig, ax = plt.subplots(figsize=(5.0, 4.2), dpi=150)
        for _, row in wide.iterrows():
            ax.plot([0, 1], [row["S"], row["M"]], color="0.75", lw=0.7, zorder=1)
        ax.scatter(np.zeros(len(wide)), wide["S"], color=CB["S"], label="shed", s=18, zorder=2)
        ax.scatter(np.ones(len(wide)), wide["M"], color=CB["M"], label="marcescent", s=18, zorder=2)
        ax.set_xticks([0, 1]); ax.set_xticklabels(["shed", "marcescent"])
        ax.set_ylabel(ylabel)
        ax.set_title(f"{title}\nWilcoxon p={res['p']:.3f} (n={res['n_species']} species)")
        ax.legend(frameon=False)
        fig.tight_layout(); fig.savefig(os.path.join(FIG, fname)); plt.close(fig)

    paired_fig(poly_w, "Polyphenolics (% of spectrum)", "Recalcitrant chemistry: polyphenolics", "fig-1-polyphenolics.png", results["T1"])
    paired_fig(sacc_w, "Polysaccharides (% of spectrum)", "Labile chemistry: polysaccharides", "fig-2-polysaccharides.png", results["T2"])
    paired_fig(fb_w, "Fungi : bacteria (PLFA ratio)", "Decomposer community: fungal dominance", "fig-3-fungi-bacteria.png", results["T3"])

    # ---- tables ----
    poly_w.reset_index().rename(columns={"M": "polyphenolics_marcescent", "S": "polyphenolics_shed"}).assign(
        polysacc_marcescent=sacc_w["M"].values, polysacc_shed=sacc_w["S"].values
    ).to_csv(os.path.join(TAB, "tbl-1-ftir-chemistry.csv"), index=False)
    fb_w.reset_index().rename(columns={"M": "fb_ratio_marcescent", "S": "fb_ratio_shed"}).to_csv(
        os.path.join(TAB, "tbl-2-fungi-bacteria.csv"), index=False)
    pd.DataFrame([{"test": n, "raw_p": results["multiplicity"][n]["raw_p"],
                   "bh_p": results["multiplicity"][n]["bh_p"], "holm_p": results["multiplicity"][n]["holm_p"]}
                  for n, _ in fam]).to_csv(os.path.join(TAB, "tbl-3-multiplicity.csv"), index=False)

    dp = {"name": "marcescence-litter-chemistry-tables", "resources": []}
    for path in ["tbl-1-ftir-chemistry.csv", "tbl-2-fungi-bacteria.csv", "tbl-3-multiplicity.csv"]:
        cols = pd.read_csv(os.path.join(TAB, path)).columns
        dp["resources"].append({"name": path[:-4], "path": path, "schema": {"fields": [
            {"name": c, "type": "string" if c in ("Species", "test") else "number"} for c in cols]}})
    with open(os.path.join(TAB, "datapackage.json"), "w") as fh:
        json.dump(dp, fh, indent=2)

    print("md5", md5)
    for k in ("T1", "T2", "T3"):
        r = results[k]
        print(k, r["test"], "| p=%.4f r=%.2f | M=%.2f S=%.2f n=%d" % (r["p"], r["rank_biserial_r"], r["mean_M"], r["mean_S"], r["n_species"]))
    print("BH", [round(x, 4) for x in bh], "Holm", [round(x, 4) for x in holm])


if __name__ == "__main__":
    main()
