Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
269 changes: 198 additions & 71 deletions Common/Tools/PID/pidTPCModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ class pidTPCModule
network.initModel(pidTPCopts.networkPathLocally.value, pidTPCopts.enableNetworkOptimizations.value, pidTPCopts.networkSetNumThreads.value, strtoul(headers["Valid-From"].c_str(), NULL, 0), strtoul(headers["Valid-Until"].c_str(), NULL, 0));
std::vector<float> dummyInput(network.getNumInputNodes(), 1.);
network.evalModel(dummyInput); /// Init the model evaluations
setupColumnInputNetwork();
LOGP(info, "Retrieved NN corrections for production tag {}, pass number {}, and NN-Version {}", headers["LPMProductionTag"], headers["RecoPassName"], headers["NN-Version"]);
} else {
LOG(fatal) << "No valid NN object found matching retrieved Bethe-Bloch parametrisation for pass " << metadata["RecoPassName"] << ". Please ensure that the requested pass has dedicated NN corrections available";
Expand All @@ -427,6 +428,7 @@ class pidTPCModule
network.initModel(pidTPCopts.networkPathLocally.value, pidTPCopts.enableNetworkOptimizations.value, pidTPCopts.networkSetNumThreads.value);
std::vector<float> dummyInput(network.getNumInputNodes(), 1.);
network.evalModel(dummyInput); // This is an initialisation and might reduce the overhead of the model
setupColumnInputNetwork();
}
} else {
return;
Expand All @@ -438,6 +440,110 @@ class pidTPCModule
}
} // end init

//__________________________________________________
void setupColumnInputNetwork()
{
using PI = o2::ml::OnnxModel::PreprocInput;
using PF = o2::ml::OnnxModel::PreprocFeature;
const int nFeat = network.getNumInputNodes(); // # network features (6..9), original model

// Raw graph inputs (this order defines the tensor feeding order in
// createNetworkPrediction). All per-track columns are wrapped zero-copy from
// the Arrow buffers; nclNorm/hrDivisor/hrFallback are per-DF runtime scalars.
std::vector<PI> in;
in.push_back({"tpcInnerParam", PI::Type::TrackFloat});
in.push_back({"tgl", PI::Type::TrackFloat});
in.push_back({"signed1Pt", PI::Type::TrackFloat});
in.push_back({"mass", PI::Type::ScalarFloat});
in.push_back({"collisionId", PI::Type::TrackInt32});
in.push_back({"multArray", PI::Type::CollisionFloat});
in.push_back({"nclNorm", PI::Type::ScalarFloat});
in.push_back({"nclsFindable", PI::Type::TrackUint8});
in.push_back({"nclsFMF", PI::Type::TrackInt8});
if (nFeat >= 7) {
in.push_back({"occArray", PI::Type::CollisionFloat});
}
if (nFeat >= 8) {
in.push_back({"hrArray", PI::Type::CollisionFloat});
in.push_back({"hrDivisor", PI::Type::ScalarFloat});
in.push_back({"hrFallback", PI::Type::ScalarFloat});
}
if (nFeat >= 9) {
in.push_back({"phi", PI::Type::TrackFloat});
}
in.push_back({"validMask", PI::Type::TrackBool});

// Per-feature preprocessing (exactly nFeat entries, in the training order).
std::vector<PF> feat;
{
PF f;
f.op = PF::Op::Passthrough;
f.a = "tpcInnerParam";
feat.push_back(f);
}
{
PF f;
f.op = PF::Op::Passthrough;
f.a = "tgl";
feat.push_back(f);
}
{
PF f;
f.op = PF::Op::Passthrough;
f.a = "signed1Pt";
feat.push_back(f);
}
{
PF f;
f.op = PF::Op::BroadcastScalar;
f.a = "mass";
f.shapeRef = "collisionId";
feat.push_back(f);
}
{
PF f;
f.op = PF::Op::GatherNormWhere;
f.a = "multArray";
f.b = "collisionId";
f.c = {11000.f, 1.f, 0.f};
feat.push_back(f);
}
{
PF f;
f.op = PF::Op::NClsSqrtRecip;
f.a = "nclsFindable";
f.b = "nclsFMF";
f.scaleInput = "nclNorm";
feat.push_back(f);
}
if (nFeat >= 7) {
PF f;
f.op = PF::Op::GatherNormWhere;
f.a = "occArray";
f.b = "collisionId";
f.c = {60000.f, 1.f, 0.f};
feat.push_back(f);
}
if (nFeat >= 8) {
PF f;
f.op = PF::Op::GatherNormWhere;
f.a = "hrArray";
f.b = "collisionId";
f.scaleInput = "hrDivisor";
f.fallbackInput = "hrFallback";
feat.push_back(f);
}
if (nFeat >= 9) {
PF f;
f.op = PF::Op::Mod2;
f.a = "phi";
f.c = {2.f * static_cast<float>(M_PI), 2.f * static_cast<float>(M_PI), static_cast<float>(M_PI) / 9.0f};
feat.push_back(f);
}

network.setupColumnInputs(in, feat, "validMask");
}

//__________________________________________________
template <typename TCCDB, typename M, typename T, typename B>
std::vector<float> createNetworkPrediction(TCCDB& ccdb, soa::Join<aod::Collisions, aod::EvSels> const& collisions, M const& mults, T const& tracks, B const& bcs, const size_t size)
Expand Down Expand Up @@ -489,6 +595,7 @@ class pidTPCModule
network.initModel(pidTPCopts.networkPathLocally.value, pidTPCopts.enableNetworkOptimizations.value, pidTPCopts.networkSetNumThreads.value, strtoul(headers["Valid-From"].c_str(), NULL, 0), strtoul(headers["Valid-Until"].c_str(), NULL, 0));
std::vector<float> dummyInput(network.getNumInputNodes(), 1.);
network.evalModel(dummyInput);
setupColumnInputNetwork();
LOGP(info, "Retrieved NN corrections for production tag {}, pass number {}, NN-Version number {}", headers["LPMProductionTag"], headers["RecoPassName"], headers["NN-Version"]);
} else {
LOG(fatal) << "No valid NN object found matching retrieved Bethe-Bloch parametrisation for pass " << metadata["RecoPassName"] << ". Please ensure that the requested pass has dedicated NN corrections available";
Expand All @@ -497,19 +604,14 @@ class pidTPCModule
}

// Defining some network parameters
int input_dimensions = network.getNumInputNodes();
const int nFeat = network.getNumFeatures();
int output_dimensions = network.getNumOutputNodes();
const uint64_t track_prop_size = input_dimensions * size;
const uint64_t prediction_size = output_dimensions * size;

network_prediction = std::vector<float>(prediction_size * 9); // For each mass hypotheses
const float nNclNormalization = response->GetNClNormalization();
float duration_network = 0;

std::vector<float> track_properties(track_prop_size);
uint64_t counter_track_props = 0;
int loop_counter = 0;

// To load the Hadronic rate once for each collision
float hadronicRateBegin = 0.;
std::vector<float> hadronicRateForCollision(collisions.size(), 0.0f);
Expand All @@ -530,88 +632,113 @@ class pidTPCModule
hadronicRateBegin = 0.0f;
}

// Filling a std::vector<float> to be evaluated by the network
// Evaluation on single tracks brings huge overhead: Thus evaluation is done on one large vector
static constexpr int NParticleTypes = 9;
constexpr int ExpectedInputDimensionsNNV2 = 7;
constexpr int ExpectedInputDimensionsNNV3 = 8;
constexpr int ExpectedInputDimensionsNNV4 = 9;
constexpr auto NetworkVersionV2 = "2";
constexpr auto NetworkVersionV3 = "3";
constexpr auto NetworkVersionV4 = "4";
for (int j = 0; j < NParticleTypes; j++) { // Loop over particle number for which network correction is used
for (auto const& trk : tracks) {
if (!trk.hasTPC()) {
continue;
}
if (pidTPCopts.skipTPCOnly) {
if (!trk.hasITS() && !trk.hasTRD() && !trk.hasTOF()) {
continue;
}
}
track_properties[counter_track_props] = trk.tpcInnerParam();
track_properties[counter_track_props + 1] = trk.tgl();
track_properties[counter_track_props + 2] = trk.signed1Pt();
track_properties[counter_track_props + 3] = o2::track::pid_constants::sMasses[j];
track_properties[counter_track_props + 4] = trk.has_collision() ? mults[trk.collisionId()] / 11000. : 1.;
track_properties[counter_track_props + 5] = std::sqrt(nNclNormalization / trk.tpcNClsFound());
if (input_dimensions == ExpectedInputDimensionsNNV2 && networkVersion == NetworkVersionV2) {
track_properties[counter_track_props + 6] = trk.has_collision() ? collisions.iteratorAt(trk.collisionId()).ft0cOccupancyInTimeRange() / 60000. : 1.;
}
if (input_dimensions == ExpectedInputDimensionsNNV3 && networkVersion == NetworkVersionV3) {
track_properties[counter_track_props + 6] = trk.has_collision() ? collisions.iteratorAt(trk.collisionId()).ft0cOccupancyInTimeRange() / 60000. : 1.;
if (trk.has_collision()) {
if (collsys == CollisionSystemType::kCollSyspp) {
track_properties[counter_track_props + 7] = hadronicRateForCollision[trk.collisionId()] / 1500.;
} else {
track_properties[counter_track_props + 7] = hadronicRateForCollision[trk.collisionId()] / 50.;
}
} else {
// asign Hadronic Rate at beginning of run if track does not belong to a collision
if (collsys == CollisionSystemType::kCollSyspp) {
track_properties[counter_track_props + 7] = hadronicRateBegin / 1500.;
} else {
track_properties[counter_track_props + 7] = hadronicRateBegin / 50.;
}
}

const float hadronicRateDivisor = (collsys == CollisionSystemType::kCollSyspp) ? 1500.f : 50.f;

// Per-collision arrays (O(nColl)); gathered per track inside the model via the
// collisionId column, then normalised in-graph.
const int64_t nColl = static_cast<int64_t>(collisions.size());
std::vector<float> multArray(nColl);
std::vector<float> occArray(nFeat >= ExpectedInputDimensionsNNV2 ? nColl : 0);
{
int64_t c = 0;
for (const auto& col : collisions) {
multArray[c] = static_cast<float>(mults[c]);
if (nFeat >= ExpectedInputDimensionsNNV2) {
occArray[c] = col.ft0cOccupancyInTimeRange();
}
++c;
}
}

if (input_dimensions == ExpectedInputDimensionsNNV4 && networkVersion == NetworkVersionV4) {
track_properties[counter_track_props + 6] = trk.has_collision() ? collisions.iteratorAt(trk.collisionId()).ft0cOccupancyInTimeRange() / 60000. : 1.;
if (trk.has_collision()) {
if (collsys == CollisionSystemType::kCollSyspp) {
track_properties[counter_track_props + 7] = hadronicRateForCollision[trk.collisionId()] / 1500.;
} else {
track_properties[counter_track_props + 7] = hadronicRateForCollision[trk.collisionId()] / 50.;
}
} else {
// asign Hadronic Rate at beginning of run if track does not belong to a collision
if (collsys == CollisionSystemType::kCollSyspp) {
track_properties[counter_track_props + 7] = hadronicRateBegin / 1500.;
} else {
track_properties[counter_track_props + 7] = hadronicRateBegin / 50.;
}
}
track_properties[counter_track_props + 8] = std::fmod(std::fmod(trk.phi(), 2 * M_PI) + 2 * M_PI, M_PI / 9.0);
// Raw per-track Arrow column buffers (zero-copy; one chunk per DataFrame).
auto arrowTable = tracks.asArrowTable();
auto chunk0 = [&](const char* name) -> std::shared_ptr<arrow::Array> {
const int idx = arrowTable->schema()->GetFieldIndex(name);
if (idx < 0) {
LOG(fatal) << "createNetworkPrediction: column '" << name << "' not found in tracks table";
}
auto col = arrowTable->column(idx);
if (col->num_chunks() != 1) {
LOG(fatal) << "createNetworkPrediction: column '" << name << "' has " << col->num_chunks()
<< " chunks; a single chunk per DataFrame is required for zero-copy input";
}
return col->chunk(0);
};
const int64_t nTrk = static_cast<int64_t>(tracks.size());
const float* pTpcInner = std::static_pointer_cast<arrow::FloatArray>(chunk0("fTPCInnerParam"))->raw_values();
const float* pTgl = std::static_pointer_cast<arrow::FloatArray>(chunk0("fTgl"))->raw_values();
const float* pSigned1Pt = std::static_pointer_cast<arrow::FloatArray>(chunk0("fSigned1Pt"))->raw_values();
const int32_t* pCollId = std::static_pointer_cast<arrow::Int32Array>(chunk0("fIndexCollisions"))->raw_values();
const uint8_t* pFindable = std::static_pointer_cast<arrow::UInt8Array>(chunk0("fTPCNClsFindable"))->raw_values();
const int8_t* pFMF = std::static_pointer_cast<arrow::Int8Array>(chunk0("fTPCNClsFindableMinusFound"))->raw_values();
const float* pPhi = (nFeat >= ExpectedInputDimensionsNNV4)
? std::static_pointer_cast<arrow::FloatArray>(chunk0("fPhi"))->raw_values()
: nullptr;

// Single boolean mask of the tracks the network runs on; the model Compress'es
// to exactly these rows so the output is compact and the consumer's
// count_tracks indexing is unchanged. Condition matches process()'s counter.
std::vector<uint8_t> validMask(nTrk);
{
int64_t t = 0;
for (auto const& trk : tracks) {
bool valid = trk.hasTPC();
if (valid && pidTPCopts.skipTPCOnly && !trk.hasITS() && !trk.hasTRD() && !trk.hasTOF()) {
valid = false;
}
counter_track_props += input_dimensions;
validMask[t++] = valid ? 1 : 0;
}
}

auto memInfo = Ort::MemoryInfo::CreateCpu(OrtAllocatorType::OrtArenaAllocator, OrtMemType::OrtMemTypeDefault);
const int64_t one = 1;
float massVal = 0.f;
float nclNormVal = nNclNormalization;
float hrDivisorVal = hadronicRateDivisor;
float hrFallbackVal = hadronicRateBegin / hadronicRateDivisor;

// Evaluate once per mass hypothesis; only the mass scalar input changes.
for (int j = 0; j < NParticleTypes; j++) {
massVal = o2::track::pid_constants::sMasses[j];

std::vector<Ort::Value> inputTensors;
inputTensors.emplace_back(Ort::Value::CreateTensor<float>(memInfo, const_cast<float*>(pTpcInner), nTrk, &nTrk, 1));
inputTensors.emplace_back(Ort::Value::CreateTensor<float>(memInfo, const_cast<float*>(pTgl), nTrk, &nTrk, 1));
inputTensors.emplace_back(Ort::Value::CreateTensor<float>(memInfo, const_cast<float*>(pSigned1Pt), nTrk, &nTrk, 1));
inputTensors.emplace_back(Ort::Value::CreateTensor<float>(memInfo, &massVal, 1, &one, 1));
inputTensors.emplace_back(Ort::Value::CreateTensor<int32_t>(memInfo, const_cast<int32_t*>(pCollId), nTrk, &nTrk, 1));
inputTensors.emplace_back(Ort::Value::CreateTensor<float>(memInfo, multArray.data(), nColl, &nColl, 1));
inputTensors.emplace_back(Ort::Value::CreateTensor<float>(memInfo, &nclNormVal, 1, &one, 1));
inputTensors.emplace_back(Ort::Value::CreateTensor<uint8_t>(memInfo, const_cast<uint8_t*>(pFindable), nTrk, &nTrk, 1));
inputTensors.emplace_back(Ort::Value::CreateTensor<int8_t>(memInfo, const_cast<int8_t*>(pFMF), nTrk, &nTrk, 1));
if (nFeat >= ExpectedInputDimensionsNNV2) {
inputTensors.emplace_back(Ort::Value::CreateTensor<float>(memInfo, occArray.data(), nColl, &nColl, 1));
}
if (nFeat >= ExpectedInputDimensionsNNV3) {
inputTensors.emplace_back(Ort::Value::CreateTensor<float>(memInfo, hadronicRateForCollision.data(), nColl, &nColl, 1));
inputTensors.emplace_back(Ort::Value::CreateTensor<float>(memInfo, &hrDivisorVal, 1, &one, 1));
inputTensors.emplace_back(Ort::Value::CreateTensor<float>(memInfo, &hrFallbackVal, 1, &one, 1));
}
if (nFeat >= ExpectedInputDimensionsNNV4) {
inputTensors.emplace_back(Ort::Value::CreateTensor<float>(memInfo, const_cast<float*>(pPhi), nTrk, &nTrk, 1));
}
inputTensors.emplace_back(Ort::Value::CreateTensor<bool>(memInfo, reinterpret_cast<bool*>(validMask.data()), nTrk, &nTrk, 1));

auto start_network_eval = std::chrono::high_resolution_clock::now();
float* output_network = network.evalModel(track_properties);
float* output_network = network.evalModel<float>(inputTensors);
auto stop_network_eval = std::chrono::high_resolution_clock::now();
duration_network += std::chrono::duration<float, std::ratio<1, 1000000000>>(stop_network_eval - start_network_eval).count();
for (uint64_t k = 0; k < prediction_size; k += output_dimensions) {
for (int l = 0; l < output_dimensions; l++) {
network_prediction[k + l + prediction_size * loop_counter] = output_network[k + l];
network_prediction[k + l + prediction_size * j] = output_network[k + l];
}
}

counter_track_props = 0;
loop_counter += 1;
}
track_properties.clear();

auto stop_network_total = std::chrono::high_resolution_clock::now();
LOG(debug) << "Neural Network for the TPC PID response correction: Time per track (eval ONNX): " << duration_network / (size * 9) << "ns ; Total time (eval ONNX): " << duration_network / 1000000000 << " s";
LOG(debug) << "Neural Network for the TPC PID response correction: Time per track (eval + overhead): " << std::chrono::duration<float, std::ratio<1, 1000000000>>(stop_network_total - start_network_total).count() / (size * 9) << "ns ; Total time (eval + overhead): " << std::chrono::duration<float, std::ratio<1, 1000000000>>(stop_network_total - start_network_total).count() / 1000000000 << " s";
Expand Down
2 changes: 1 addition & 1 deletion Tools/ML/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@

o2physics_add_library(MLCore
SOURCES model.cxx
PUBLIC_LINK_LIBRARIES O2::Framework O2Physics::AnalysisCore ONNXRuntime::ONNXRuntime
PUBLIC_LINK_LIBRARIES O2::Framework O2Physics::AnalysisCore ONNXRuntime::ONNXRuntime ONNX::onnx_proto
)
Loading
Loading