Skip to content

Commit

Permalink
wip. Move get-by-version methods from CatalogService to Catalog.
Browse files Browse the repository at this point in the history
  • Loading branch information
AMashenkov committed Jan 24, 2025
1 parent dab7d11 commit 5e7ed50
Show file tree
Hide file tree
Showing 59 changed files with 279 additions and 367 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ void clearDatabase() {
ignite.tables().tables().forEach(table -> sql("DROP TABLE " + table.name()));

CatalogManagerImpl catalogManager = (CatalogManagerImpl) unwrapIgniteImpl(ignite).catalogManager();
catalogManager.zones(catalogManager.latestCatalogVersion()).stream()
Catalog catalog = catalogManager.catalog(catalogManager.latestCatalogVersion());

catalog.zones().stream()
.filter(zone -> !CatalogManagerImpl.DEFAULT_ZONE_NAME.equals(zone.name()))
.forEach(zone -> sql("DROP ZONE " + zone.name()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;
Expand All @@ -42,11 +41,7 @@
import org.apache.ignite.internal.catalog.commands.CreateSchemaCommand;
import org.apache.ignite.internal.catalog.commands.CreateZoneCommand;
import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite.internal.catalog.storage.Fireable;
Expand Down Expand Up @@ -189,48 +184,6 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
return updateLog.stopAsync(componentContext);
}

@Override
public @Nullable CatalogTableDescriptor table(int tableId, int catalogVersion) {
return catalog(catalogVersion).table(tableId);
}

@Override
public Collection<CatalogTableDescriptor> tables(int catalogVersion) {
return catalog(catalogVersion).tables();
}

@Override
public @Nullable CatalogIndexDescriptor index(int indexId, int catalogVersion) {
return catalog(catalogVersion).index(indexId);
}

@Override
public Collection<CatalogIndexDescriptor> indexes(int catalogVersion) {
return catalog(catalogVersion).indexes();
}

@Override
public List<CatalogIndexDescriptor> indexes(int catalogVersion, int tableId) {
return catalog(catalogVersion).indexes(tableId);
}

@Override
public @Nullable CatalogSchemaDescriptor schema(int schemaId, int catalogVersion) {
Catalog catalog = catalog(catalogVersion);

return catalog == null ? null : catalog.schema(schemaId);
}

@Override
public @Nullable CatalogZoneDescriptor zone(int zoneId, int catalogVersion) {
return catalog(catalogVersion).zone(zoneId);
}

@Override
public Collection<CatalogZoneDescriptor> zones(int catalogVersion) {
return catalog(catalogVersion).zones();
}

@Override
public int activeCatalogVersion(long timestamp) {
return catalogAt(timestamp).version();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,7 @@

package org.apache.ignite.internal.catalog;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite.internal.event.EventProducer;
Expand Down Expand Up @@ -78,30 +72,6 @@ public interface CatalogService extends EventProducer<CatalogEvent, CatalogEvent
*/
Catalog activeCatalog(long timestamp);

@Deprecated(forRemoval = true)
@Nullable CatalogTableDescriptor table(int tableId, int catalogVersion);

@Deprecated(forRemoval = true)
Collection<CatalogTableDescriptor> tables(int catalogVersion);

@Deprecated(forRemoval = true)
@Nullable CatalogIndexDescriptor index(int indexId, int catalogVersion);

@Deprecated(forRemoval = true)
Collection<CatalogIndexDescriptor> indexes(int catalogVersion);

@Deprecated(forRemoval = true)
List<CatalogIndexDescriptor> indexes(int catalogVersion, int tableId);

@Deprecated(forRemoval = true)
@Nullable CatalogSchemaDescriptor schema(int schemaId, int catalogVersion);

@Deprecated(forRemoval = true)
@Nullable CatalogZoneDescriptor zone(int zoneId, int catalogVersion);

@Deprecated(forRemoval = true)
Collection<CatalogZoneDescriptor> zones(int catalogVersion);

/**
* Retrieves the actual catalog version at the specified timestamp.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,11 @@ private int indexId(String indexName) {
}

private List<Integer> tableIndexIds(int catalogVersion, int tableId) {
return manager.indexes(catalogVersion, tableId).stream().map(CatalogObjectDescriptor::id).collect(toList());
Catalog catalog = manager.catalog(catalogVersion);

assert catalog != null;

return catalog.indexes(tableId).stream().map(CatalogObjectDescriptor::id).collect(toList());
}

private int tableId(String tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,12 @@ void testManagerWorksAsExpected() {

int version2 = manager.latestCatalogVersion();

Collection<CatalogTableDescriptor> tablesOfVersion1 = manager.tables(version1);
Collection<CatalogTableDescriptor> tablesOfVersion1 = manager.catalog(version1).tables();

assertThat(tablesOfVersion1, hasSize(1));
assertThat(tablesOfVersion1, hasItem(descriptorWithName("T1")));

Collection<CatalogTableDescriptor> tablesOfVersion2 = manager.tables(version2);
Collection<CatalogTableDescriptor> tablesOfVersion2 = manager.catalog(version2).tables();

assertThat(tablesOfVersion2, hasSize(2));
assertThat(tablesOfVersion2, hasItem(descriptorWithName("T1")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.ignite.internal.catalog.commands.CreateZoneCommandBuilder;
import org.apache.ignite.internal.catalog.commands.DropTableCommand;
import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.storage.ObjectIdGenUpdateEntry;
import org.apache.ignite.internal.catalog.storage.SnapshotEntry;
Expand All @@ -72,7 +71,6 @@
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.sql.ColumnType;
import org.jetbrains.annotations.Nullable;

/** Utilities for working with the catalog in tests. */
public class CatalogTestUtils {
Expand Down Expand Up @@ -484,7 +482,7 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
* @param tableName Table name.
*/
public static CatalogTableDescriptor table(CatalogService catalogService, int catalogVersion, String tableName) {
CatalogTableDescriptor tableDescriptor = catalogService.tables(catalogVersion).stream()
CatalogTableDescriptor tableDescriptor = catalogService.catalog(catalogVersion).tables().stream()
.filter(table -> tableName.equals(table.name()))
.findFirst()
.orElse(null);
Expand All @@ -494,39 +492,6 @@ public static CatalogTableDescriptor table(CatalogService catalogService, int ca
return tableDescriptor;
}

/**
* Searches for an index by name in the requested version of the catalog. Throws if the index is not found.
*
* @param catalogService Catalog service.
* @param catalogVersion Catalog version in which to find the index.
* @param indexName Index name.
* @return Index (cannot be null).
*/
public static CatalogIndexDescriptor index(CatalogService catalogService, int catalogVersion, String indexName) {
CatalogIndexDescriptor indexDescriptor = indexOrNull(catalogService,
catalogVersion, indexName);

assertNotNull(indexDescriptor, "catalogVersion=" + catalogVersion + ", indexName=" + indexName);

return indexDescriptor;
}

/**
* Searches for an index by name in the requested version of the catalog.
*
* @param catalogService Catalog service.
* @param catalogVersion Catalog version in which to find the index.
* @param indexName Index name.
* @return Index or {@code null} if not found.
*/
@Nullable
public static CatalogIndexDescriptor indexOrNull(CatalogService catalogService, int catalogVersion, String indexName) {
return catalogService.indexes(catalogVersion).stream()
.filter(index -> indexName.equals(index.name()))
.findFirst()
.orElse(null);
}

/**
* Update handler interceptor for test purposes.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ private CompletableFuture<Collection<CatalogTableDescriptor>> tablesAtNow() {
HybridTimestamp now = clockService.now();

return schemaSyncService.waitForMetadataCompleteness(now)
.thenApply(unused -> catalogService.tables(catalogService.activeCatalogVersion(now.longValue())));
.thenApply(unused -> catalogService.activeCatalog(now.longValue()).tables());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
Expand Down Expand Up @@ -87,46 +84,6 @@ public Catalog activeCatalog(long timestamp) {
return catalog;
}

@Override
public CatalogTableDescriptor table(int tableId, int catalogVersion) {
return null;
}

@Override
public Collection<CatalogTableDescriptor> tables(int catalogVersion) {
return null;
}

@Override
public CatalogIndexDescriptor index(int indexId, int catalogVersion) {
return null;
}

@Override
public Collection<CatalogIndexDescriptor> indexes(int catalogVersion) {
return null;
}

@Override
public List<CatalogIndexDescriptor> indexes(int catalogVersion, int tableId) {
return null;
}

@Override
public CatalogSchemaDescriptor schema(int schemaId, int catalogVersion) {
return null;
}

@Override
public CatalogZoneDescriptor zone(int zoneId, int catalogVersion) {
return null;
}

@Override
public Collection<CatalogZoneDescriptor> zones(int catalogVersion) {
return null;
}

@Override
public int activeCatalogVersion(long timestamp) {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ private CompletableFuture<Void> onCreateZone(CatalogZoneDescriptor zone, long ca
private CompletableFuture<Void> restoreTimers(int catalogVersion) {
List<CompletableFuture<Void>> futures = new ArrayList<>();

for (CatalogZoneDescriptor zone : catalogManager.zones(catalogVersion)) {
for (CatalogZoneDescriptor zone : catalogManager.catalog(catalogVersion).zones()) {
ZoneState zoneState = zonesState.get(zone.id());

// Max revision from the {@link ZoneState#topologyAugmentationMap()} for node joins.
Expand Down Expand Up @@ -859,7 +859,7 @@ private CompletableFuture<Void> onLogicalTopologyUpdate(Set<NodeWithAttributes>

logicalTopologyByRevision.put(revision, newLogicalTopology);

for (CatalogZoneDescriptor zone : catalogManager.zones(catalogVersion)) {
for (CatalogZoneDescriptor zone : catalogManager.catalog(catalogVersion).zones()) {
int zoneId = zone.id();

updateLocalTopologyAugmentationMap(addedNodes, removedNodes, revision, zoneId);
Expand Down Expand Up @@ -1624,7 +1624,7 @@ private CompletableFuture<Void> createOrRestoreZonesStates(long recoveryRevision
List<CompletableFuture<Void>> futures = new ArrayList<>();

// TODO: IGNITE-20287 Clean up abandoned resources for dropped tables from vault and metastore
for (CatalogZoneDescriptor zone : catalogManager.zones(catalogVersion)) {
for (CatalogZoneDescriptor zone : catalogManager.catalog(catalogVersion).zones()) {
futures.add(restoreZoneStateBusy(zone, recoveryRevision));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.commands.StorageProfileParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
Expand Down Expand Up @@ -667,12 +667,11 @@ static StripedScheduledThreadPoolExecutor createZoneManagerExecutor(int concurre
* Returns list of table descriptors bound to the zone.
*
* @param zoneId Zone id.
* @param catalogVersion Catalog version.
* @param catalogService Catalog service
* @param catalog Catalog.
* @return List of table descriptors from the zone.
*/
public static List<CatalogTableDescriptor> findTablesByZoneId(int zoneId, int catalogVersion, CatalogService catalogService) {
return catalogService.tables(catalogVersion).stream()
public static List<CatalogTableDescriptor> findTablesByZoneId(int zoneId, Catalog catalog) {
return catalog.tables().stream()
.filter(table -> table.zoneId() == zoneId)
.collect(toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ private long getLastConfigRevision(
CatalogZoneDescriptor entryNewerCfg = null;

// Iterate over zone configurations from newest to oldest.
for (int i = catalogVersion; i >= catalogManager.earliestCatalogVersion(); i--) {
CatalogZoneDescriptor entryOlderCfg = catalogManager.zone(zoneId, i);
for (int version = catalogVersion; version >= catalogManager.earliestCatalogVersion(); version--) {
CatalogZoneDescriptor entryOlderCfg = catalogManager.catalog(version).zone(zoneId);

if (entryOlderCfg == null) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ protected CompletableFuture<Void> onReplicasUpdate(AlterZoneEventParameters para
// TODO: And then run the remote invoke, only if needed.
private CompletableFuture<Void> rebalanceTriggersRecovery(long recoveryRevision, int catalogVersion) {
if (recoveryRevision > 0) {
List<CompletableFuture<Void>> zonesRecoveryFutures = catalogService.zones(catalogVersion)
List<CompletableFuture<Void>> zonesRecoveryFutures = catalogService.catalog(catalogVersion).zones()
.stream()
.map(zoneDesc ->
recalculateAssignmentsAndScheduleRebalance(
Expand Down Expand Up @@ -214,13 +214,12 @@ private WatchListener createDistributionZonesDataNodesListener() {
// It is safe to get the latest version of the catalog as we are in the metastore thread.
// TODO: IGNITE-22723 Potentially unsafe to use the latest catalog version, as the tables might not already present
// in the catalog. Better to store this version when writing datanodes.
int catalogVersion = catalogService.latestCatalogVersion();

Catalog catalog = catalogService.catalog(catalogVersion);
int latestCatalogVersion = catalogService.latestCatalogVersion();
Catalog catalog = catalogService.catalog(latestCatalogVersion);

long assignmentsTimestamp = catalog.time();

CatalogZoneDescriptor zoneDescriptor = catalogService.zone(zoneId, catalogVersion);
CatalogZoneDescriptor zoneDescriptor = catalog.zone(zoneId);

if (zoneDescriptor == null) {
// Zone has been removed.
Expand Down Expand Up @@ -262,7 +261,7 @@ private WatchListener createDistributionZonesDataNodesListener() {
return nullCompletedFuture();
}

List<CatalogTableDescriptor> tableDescriptors = findTablesByZoneId(zoneId, catalogVersion, catalogService);
List<CatalogTableDescriptor> tableDescriptors = findTablesByZoneId(zoneId, catalog);

return triggerPartitionsRebalanceForAllTables(
evt.entryEvent().newEntry().revision(),
Expand Down Expand Up @@ -302,10 +301,10 @@ private CompletableFuture<Void> recalculateAssignmentsAndScheduleRebalance(
return nullCompletedFuture();
}

List<CatalogTableDescriptor> tableDescriptors = findTablesByZoneId(zoneDescriptor.id(), catalogVersion, catalogService);

Catalog catalog = catalogService.catalog(catalogVersion);

List<CatalogTableDescriptor> tableDescriptors = findTablesByZoneId(zoneDescriptor.id(), catalog);

return triggerPartitionsRebalanceForAllTables(
causalityToken,
zoneDescriptor,
Expand Down
Loading

0 comments on commit 5e7ed50

Please sign in to comment.