Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ utilityStatement
| showQueries | showDiskUsage | showCurrentTimestamp | killQuery | grantWatermarkEmbedding
| revokeWatermarkEmbedding | loadConfiguration | loadTimeseries | loadFile
| removeFile | unloadFile | setSqlDialect | showCurrentSqlDialect | showCurrentUser
| repairDataPartitionTable
| repairDataPartitionTable | showRepairDataPartitionTableProgress
;

/**
Expand Down Expand Up @@ -1244,6 +1244,11 @@ repairDataPartitionTable
: REPAIR DATA PARTITION TABLE
;

// Show Repair Data Partition Table Progress
showRepairDataPartitionTableProgress
: SHOW REPAIR DATA PARTITION TABLE PROGRESS
;

// Explain
explain
: EXPLAIN (ANALYZE VERBOSE?)? selectStatement?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,10 @@ REPAIR
: R E P A I R
;

PROGRESS
: P R O G R E S S
;

SCHEMA_REPLICATION_FACTOR
: S C H E M A '_' R E P L I C A T I O N '_' F A C T O R
;
Expand Down Expand Up @@ -1399,4 +1403,4 @@ fragment V: [vV];
fragment W: [wW];
fragment X: [xX];
fragment Y: [yY];
fragment Z: [zZ];
fragment Z: [zZ];
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public enum CnToDnSyncRequestType {
COLLECT_EARLIEST_TIMESLOTS,
GENERATE_DATA_PARTITION_TABLE,
GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS,

// PartitionCache
INVALIDATE_PARTITION_CACHE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ private void buildActionMap() {
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
(req, client) ->
client.generateDataPartitionTableHeartbeat((TGenerateDataPartitionTableReq) req));
actionMapBuilder.put(
CnToDnSyncRequestType.GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS,
(req, client) -> client.getDataPartitionTableGeneratorProgress());
actionMap = actionMapBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTable4InformationSchemaResp;
Expand Down Expand Up @@ -1167,6 +1168,17 @@ public TSStatus dataPartitionTableIntegrityCheck() {
return partitionManager.dataPartitionTableIntegrityCheck();
}

@Override
public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() {
TSStatus status = confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
return new TShowRepairDataPartitionTableProgressResp(status, "UNKNOWN", 0.0)
.setMessage(status.getMessage());
}

return partitionManager.showRepairDataPartitionTableProgress();
}

private void printNewCreatedDataPartition(
GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan, TDataPartitionTableResp resp) {
final String lineSeparator = System.lineSeparator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTable4InformationSchemaResp;
Expand Down Expand Up @@ -479,6 +480,8 @@ TDataPartitionTableResp getOrCreateDataPartition(

TSStatus dataPartitionTableIntegrityCheck();

TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress();

/**
* Get AuditLogger.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure;
import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
Expand Down Expand Up @@ -2340,6 +2341,18 @@ public boolean isExistUnfinishedProcedure(
return false;
}

public Optional<DataPartitionTableIntegrityCheckProcedure>
getUnfinishedDataPartitionTableIntegrityCheckProcedure() {
for (Procedure<ConfigNodeProcedureEnv> procedure : getExecutor().getProcedures().values()) {
if (!procedure.isFinished()
&& procedure instanceof DataPartitionTableIntegrityCheckProcedure) {
return Optional.of((DataPartitionTableIntegrityCheckProcedure) procedure);
}
}

return Optional.empty();
}

// ======================================================
/*
GET-SET Region
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetSeriesSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
Expand Down Expand Up @@ -541,6 +542,18 @@ public void markDataPartitionTableIntegrityCheckProcedureFinished() {
dataPartitionTableIntegrityCheckProcedureRunning.set(false);
}

public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() {
return configManager
.getProcedureManager()
.getUnfinishedDataPartitionTableIntegrityCheckProcedure()
.map(procedure -> procedure.getProgress(configManager.getProcedureManager().getEnv()))
.orElseGet(
() ->
new TShowRepairDataPartitionTableProgressResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), "IDLE", 100.0)
.setMessage("No running DataPartitionTable integrity check procedure"));
}

private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) {
TSStatus status = getConsensusManager().confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,14 @@
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.state.DataPartitionTableIntegrityCheckProcedureState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataPartitionTableGeneratorProgressResp;
import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.thrift.TException;
Expand Down Expand Up @@ -1098,4 +1101,86 @@ public void setSkipDataNodes(Set<TDataNodeConfiguration> skipDataNodes) {
public void setFailedDataNodes(Set<TDataNodeConfiguration> failedDataNodes) {
this.failedDataNodes = failedDataNodes;
}

public TShowRepairDataPartitionTableProgressResp getProgress(final ConfigNodeProcedureEnv env) {
final DataPartitionTableIntegrityCheckProcedureState currentState = getCurrentState();
final String state = currentState == null ? "UNKNOWN" : currentState.name();
final double progress =
currentState == null ? 0.0 : calculateProgressByState(env, currentState) * 100;

return new TShowRepairDataPartitionTableProgressResp(
RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS), state, progress)
.setMessage(String.format("DataPartitionTable integrity check progress: %.1f%%", progress));
}

private double calculateProgressByState(
final ConfigNodeProcedureEnv env,
final DataPartitionTableIntegrityCheckProcedureState currentState) {
switch (currentState) {
case COLLECT_EARLIEST_TIMESLOTS:
return 0.0;
case ANALYZE_MISSING_PARTITIONS:
return 0.05;
case REQUEST_PARTITION_TABLES:
return 0.1;
case REQUEST_PARTITION_TABLES_HEART_BEAT:
return 0.1 + 0.8 * calculateDataNodeGeneratorProgress(env);
case MERGE_PARTITION_TABLES:
return 0.95;
case WRITE_PARTITION_TABLE_TO_CONSENSUS:
return 0.99;
default:
return 0.0;
}
}

private double calculateDataNodeGeneratorProgress(final ConfigNodeProcedureEnv env) {
final LoadManager currentLoadManager =
loadManager == null ? env.getConfigManager().getLoadManager() : loadManager;

final Set<TDataNodeConfiguration> targetDataNodes = new HashSet<>(allDataNodes);
targetDataNodes.removeAll(skipDataNodes);
if (targetDataNodes.isEmpty()) {
return dataPartitionTables.isEmpty() ? 0.0 : 1.0;
}

double progressSum = 0.0;
for (TDataNodeConfiguration dataNode : targetDataNodes) {
final int dataNodeId = dataNode.getLocation().getDataNodeId();
if (dataPartitionTables.containsKey(dataNodeId)
|| failedDataNodes.contains(dataNode)
|| !NodeStatus.Running.equals(currentLoadManager.getNodeStatus(dataNodeId))) {
progressSum += 1.0;
continue;
}

try {
Object response =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithGivenRetry(
dataNode.getLocation().getInternalEndPoint(),
null,
CnToDnSyncRequestType.GET_DATA_PARTITION_TABLE_GENERATOR_PROGRESS,
MAX_RETRY_COUNT);
if (response instanceof TGetDataPartitionTableGeneratorProgressResp) {
TGetDataPartitionTableGeneratorProgressResp resp =
(TGetDataPartitionTableGeneratorProgressResp) response;
DataPartitionTableGeneratorState state =
DataPartitionTableGeneratorState.getStateByCode(resp.getErrorCode());
if (state == DataPartitionTableGeneratorState.SUCCESS) {
progressSum += 1.0;
} else if (state == DataPartitionTableGeneratorState.IN_PROGRESS) {
progressSum += Math.max(0.0, Math.min(1.0, resp.getProgress()));
}
}
} catch (Exception e) {
LOG.warn(
"[DataPartitionIntegrity] Failed to get DataPartitionTable generation progress from DataNode[id={}]: {}",
dataNodeId,
e.getMessage());
}
}

return progressSum / targetDataNodes.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTTLResp;
Expand Down Expand Up @@ -631,6 +632,11 @@ public TSStatus dataPartitionTableIntegrityCheck() {
return configManager.dataPartitionTableIntegrityCheck();
}

@Override
public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress() {
return configManager.showRepairDataPartitionTableProgress();
}

@Override
public TSStatus operatePermission(final TAuthorizerReq req) {
ConfigPhysicalPlanType configPhysicalPlanType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowRepairDataPartitionTableProgressResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTTLResp;
Expand Down Expand Up @@ -708,6 +709,14 @@ public TSStatus dataPartitionTableIntegrityCheck() throws TException {
() -> client.dataPartitionTableIntegrityCheck(), status -> !updateConfigNodeLeader(status));
}

@Override
public TShowRepairDataPartitionTableProgressResp showRepairDataPartitionTableProgress()
throws TException {
return executeRemoteCallWithRetry(
() -> client.showRepairDataPartitionTableProgress(),
resp -> !updateConfigNodeLeader(resp.status));
}

@Override
public TSStatus operatePermission(TAuthorizerReq req) throws TException {
return executeRemoteCallWithRetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableReq;
import org.apache.iotdb.mpp.rpc.thrift.TGenerateDataPartitionTableResp;
import org.apache.iotdb.mpp.rpc.thrift.TGetDataPartitionTableGeneratorProgressResp;
import org.apache.iotdb.mpp.rpc.thrift.TGetEarliestTimeslotsResp;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
Expand Down Expand Up @@ -3397,6 +3398,52 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb
return resp;
}

@Override
public TGetDataPartitionTableGeneratorProgressResp getDataPartitionTableGeneratorProgress() {
TGetDataPartitionTableGeneratorProgressResp resp =
new TGetDataPartitionTableGeneratorProgressResp();

if (currentGenerator == null) {
resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
resp.setProgress(0.0);
resp.setMessage("No DataPartitionTable generation task found");
resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
return resp;
}

switch (currentGenerator.getStatus()) {
case IN_PROGRESS:
resp.setErrorCode(DataPartitionTableGeneratorState.IN_PROGRESS.getCode());
resp.setProgress(currentGenerator.getProgress());
resp.setMessage(
String.format(
"DataPartitionTable generation in progress: %.1f%%",
currentGenerator.getProgress() * 100));
resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
break;
case COMPLETED:
resp.setErrorCode(DataPartitionTableGeneratorState.SUCCESS.getCode());
resp.setProgress(1.0);
resp.setMessage("DataPartitionTable generation completed successfully");
resp.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
break;
case FAILED:
resp.setErrorCode(DataPartitionTableGeneratorState.FAILED.getCode());
resp.setProgress(currentGenerator.getProgress());
resp.setMessage(
"DataPartitionTable generation failed: " + currentGenerator.getErrorMessage());
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
break;
default:
resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
resp.setProgress(currentGenerator.getProgress());
resp.setMessage("Unknown task status: " + currentGenerator.getStatus());
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
break;
}
return resp;
}

private void parseGenerationStatus(Object resp) {
if (currentGenerator == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ public static DatasetHeader getShowPipeHeader() {
return new DatasetHeader(ColumnHeaderConstant.showPipeColumnHeaders, true);
}

public static DatasetHeader getShowRepairDataPartitionTableProgressHeader() {
return new DatasetHeader(
ColumnHeaderConstant.showRepairDataPartitionTableProgressColumnHeaders, true);
}

public static DatasetHeader getShowTopicHeader() {
return new DatasetHeader(ColumnHeaderConstant.showTopicColumnHeaders, true);
}
Expand Down
Loading