Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -155,6 +157,9 @@ public class FragmentInstanceContext extends QueryContext {
private long closedUnseqFileNum = 0;
private boolean highestPriority = false;

// accessed value columns on each referenced AlignedTVList.
private final Map<TVList, Set<Integer>> alignedTVListColumnAccessMap = new ConcurrentHashMap<>();

public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, SessionInfo sessionInfo) {
FragmentInstanceContext instanceContext =
Expand Down Expand Up @@ -205,6 +210,43 @@ public void setQueryDataSourceType(QueryDataSourceType queryDataSourceType) {
this.queryDataSourceType = queryDataSourceType;
}

/**
* Record columns of the AlignedTVList accessed by the query. This method is called from
* prepareTvListMapForQuery with tvList.lockQueryList() held. Even though the HashSet inside
* alignedTVListColumnAccessMap is not thread-safe, the calling pattern guarantees thread safety
* without requiring additional synchronization.
*
* @param tvList the TVList being accessed
* @param columnIndexList list of column indices being accessed
*/
public void putAccessedColumns(TVList tvList, List<Integer> columnIndexList) {
Set<Integer> accessedColumns =
alignedTVListColumnAccessMap.computeIfAbsent(tvList, ignored -> new HashSet<>());
columnIndexList.stream()
.filter(Objects::nonNull)
.forEach(
index -> {
if (index >= 0) {
accessedColumns.add(index);
}
});
}

/**
* Get columns of the AlignedTVList accessed by the query. This method is called from
* prepareTvListMapForQuery with tvList.lockQueryList() held, ensuring that no other thread can
* change accessed columns for the same TVList concurrently.
*
* @param tvList the TVList being accessed
* @return set of column indices being accessed
*/
public Set<Integer> getAccessedAlignedColumns(TVList tvList) {
Set<Integer> accessedColumns = alignedTVListColumnAccessMap.get(tvList);
return accessedColumns == null
? Collections.emptySet()
: Collections.unmodifiableSet(accessedColumns);
}

@TestOnly
public static FragmentInstanceContext createFragmentInstanceContext(
FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iotdb.db.storageengine.dataregion.modification.Modification;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.utils.ModificationUtils;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;

import org.apache.tsfile.enums.TSDataType;
Expand All @@ -63,9 +64,11 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static org.apache.iotdb.commons.path.AlignedPath.VECTOR_PLACEHOLDER;

Expand Down Expand Up @@ -121,7 +124,8 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
QueryContext context,
IWritableMemChunk memChunk,
boolean isWorkMemTable,
Filter globalTimeFilter) {
Filter globalTimeFilter,
List<Integer> columnIndexList) {
// should copy globalTimeFilter because GroupByMonthFilter is stateful
Filter copyTimeFilter = null;
if (globalTimeFilter != null) {
Expand Down Expand Up @@ -163,6 +167,11 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
list.getQueryContextSet().add(context);
tvListQueryMap.put(list, list.rowCount());
} else {
// columnIndexList is to track column-level access for AlignedTVList.
// For TVList (primitive time series), it remains null and column tracking is not needed.
if (columnIndexList != null && context instanceof FragmentInstanceContext) {
((FragmentInstanceContext) context).putAccessedColumns(list, columnIndexList);
}
if (list.isSorted() || list.getQueryContextSet().isEmpty()) {
LOGGER.debug(
"Working MemTable - add current query context to mutable TVList's query list when it's sorted or no other query on it");
Expand Down Expand Up @@ -196,7 +205,13 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
list.setOwnerQuery(firstQuery);

// clone TVList
cloneList = list.clone();
Set<Integer> columnsToClone = getAccessedColumnsForQuery(list);
if (columnsToClone == null) {
cloneList = list.clone();
} else {
cloneList = ((AlignedTVList) list).clone(columnsToClone);
}

cloneList.getQueryContextSet().add(context);
tvListQueryMap.put(cloneList, cloneList.rowCount());
}
Expand All @@ -218,6 +233,10 @@ protected Map<TVList, Integer> prepareTvListMapForQuery(
}
return tvListQueryMap;
}

protected Set<Integer> getAccessedColumnsForQuery(TVList tvList) {
return null;
}
}

class AlignedResourceByPathUtils extends ResourceByPathUtils {
Expand Down Expand Up @@ -364,15 +383,15 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
return null;
}

// prepare AlignedTVList for query. It should clone TVList if necessary.
Map<TVList, Integer> alignedTvListQueryMap =
prepareTvListMapForQuery(
context, alignedMemChunk, modsToMemtable == null, globalTimeFilter);

// column index list for the query
List<Integer> columnIndexList =
alignedMemChunk.buildColumnIndexList(partialPath.getSchemaList());

// prepare AlignedTVList for query. It should clone TVList if necessary.
Map<TVList, Integer> alignedTvListQueryMap =
prepareTvListMapForQuery(
context, alignedMemChunk, modsToMemtable == null, globalTimeFilter, columnIndexList);

List<List<TimeRange>> deletionList = null;
if (modsToMemtable != null) {
deletionList =
Expand All @@ -383,6 +402,25 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
context, columnIndexList, getMeasurementSchema(), alignedTvListQueryMap, deletionList);
}

/**
* This method is called from prepareTvListMapForQuery with tvList.lockQueryList() held, ensuring
* thread-safe access to queryContextSet.
*
* @param tvList the TVList to get accessed columns for
* @return set of accessed column indices, or empty set if no columns are tracked
*/
@Override
protected Set<Integer> getAccessedColumnsForQuery(TVList tvList) {
Set<Integer> accessedColumns = new HashSet<>();
for (QueryContext queryContext : tvList.getQueryContextSet()) {
if (queryContext instanceof FragmentInstanceContext) {
accessedColumns.addAll(
((FragmentInstanceContext) queryContext).getAccessedAlignedColumns(tvList));
}
}
return accessedColumns;
}

public VectorMeasurementSchema getMeasurementSchema() {
List<String> measurementList = partialPath.getMeasurementList();
TSDataType[] types = new TSDataType[measurementList.size()];
Expand Down Expand Up @@ -535,7 +573,7 @@ public ReadOnlyMemChunk getReadOnlyMemChunkFromMemTable(
memTableMap.get(deviceID).getMemChunkMap().get(partialPath.getMeasurement());
// prepare TVList for query. It should clone TVList if necessary.
Map<TVList, Integer> tvListQueryMap =
prepareTvListMapForQuery(context, memChunk, modsToMemtable == null, globalTimeFilter);
prepareTvListMapForQuery(context, memChunk, modsToMemtable == null, globalTimeFilter, null);
List<TimeRange> deletionList = null;
if (modsToMemtable != null) {
deletionList =
Expand Down
Loading
Loading