-
Notifications
You must be signed in to change notification settings - Fork 34
Provide Struct data type support for oracle plugin #659
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
vanshikaagupta22
wants to merge
2
commits into
data-integrations:develop
Choose a base branch
from
cloudsufi:oracle-structsupport
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+313
−4
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,14 +18,22 @@ | |
|
|
||
| import com.google.common.collect.ImmutableSet; | ||
| import io.cdap.cdap.api.data.schema.Schema; | ||
| import io.cdap.cdap.api.exception.ErrorCategory; | ||
| import io.cdap.cdap.api.exception.ErrorType; | ||
| import io.cdap.cdap.api.exception.ErrorUtils; | ||
| import io.cdap.plugin.db.CommonSchemaReader; | ||
| import org.jetbrains.annotations.NotNull; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import java.sql.Connection; | ||
| import java.sql.PreparedStatement; | ||
| import java.sql.ResultSet; | ||
| import java.sql.ResultSetMetaData; | ||
| import java.sql.SQLException; | ||
| import java.sql.Types; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.Set; | ||
| import javax.annotation.Nullable; | ||
|
|
||
|
|
@@ -70,6 +78,7 @@ public class OracleSourceSchemaReader extends CommonSchemaReader { | |
| private final Boolean isTimestampOldBehavior; | ||
| private final Boolean isPrecisionlessNumAsDecimal; | ||
| private final Boolean isTimestampLtzFieldTimestamp; | ||
| private Connection connection; | ||
|
|
||
| public OracleSourceSchemaReader() { | ||
| this(null, false, false, false); | ||
|
|
@@ -136,11 +145,143 @@ public Schema getSchema(ResultSetMetaData metadata, int index) throws SQLExcepti | |
| } | ||
| return Schema.decimalOf(precision, scale); | ||
| } | ||
| case Types.STRUCT: | ||
| if (connection == null) { | ||
| throw new SQLException("Cannot resolve STRUCT schema without a database connection. " | ||
| + "Use getSchemaFields(ResultSet) to enable STRUCT type resolution."); | ||
| } | ||
| String typeName = metadata.getColumnTypeName(index); | ||
| String owner = typeName.substring(0, typeName.lastIndexOf('.')); | ||
| return getStructSchema(connection, typeName, owner); | ||
| default: | ||
| return super.getSchema(metadata, index); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public List<Schema.Field> getSchemaFields(ResultSet resultSet) throws SQLException { | ||
| this.connection = resultSet.getStatement().getConnection(); | ||
| return super.getSchemaFields(resultSet); | ||
| } | ||
|
|
||
| /** | ||
| * Builds a CDAP RECORD schema for an Oracle STRUCT type by querying the | ||
| * database metadata | ||
| * for the type's attributes. | ||
| * | ||
| * @param connection the database connection | ||
| * @param typeName the Oracle type name (e.g., "ADDRESS_TYPE") | ||
| * @return a CDAP RECORD schema with fields corresponding to the STRUCT's | ||
| * attributes | ||
| */ | ||
| private Schema getStructSchema(Connection connection, String typeName, String owner) throws SQLException { | ||
| List<Schema.Field> fields = new ArrayList<>(); | ||
| String sql = "SELECT * FROM ALL_TYPE_ATTRS WHERE TYPE_NAME = ? AND OWNER = ? ORDER BY ATTR_NO"; | ||
|
|
||
| try (PreparedStatement stmt = connection.prepareStatement(sql)) { | ||
|
|
||
| stmt.setString(1, typeName.substring(typeName.lastIndexOf('.') + 1)); | ||
| stmt.setString(2, owner); | ||
|
|
||
| try (ResultSet attrRs = stmt.executeQuery()) { | ||
| while (attrRs.next()) { | ||
| String attrName = attrRs.getString("ATTR_NAME"); | ||
| String attrTypeName = attrRs.getString("ATTR_TYPE_NAME"); | ||
| int attrSize = attrRs.getInt("PRECISION"); | ||
| int attrScale = attrRs.getInt("SCALE"); | ||
|
|
||
| Schema attrSchema = mapPrimitiveOracleType(attrTypeName, attrSize, attrScale, attrName); | ||
| if (attrSchema != null) { | ||
| fields.add(Schema.Field.of(attrName, attrSchema)); | ||
| } else { | ||
| String nestedStructOwner = attrRs.getString("ATTR_TYPE_OWNER"); | ||
| Schema nestedSchema = getStructSchema(connection, attrTypeName, nestedStructOwner); | ||
| fields.add(Schema.Field.of(attrName, nestedSchema)); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| if (fields.isEmpty()) { | ||
| throw new SQLException(String.format( | ||
| "No attributes found for Oracle STRUCT type '%s'. " | ||
| + "Ensure the type exists and is accessible.", | ||
| typeName)); | ||
| } | ||
|
|
||
| return Schema.recordOf(typeName, fields); | ||
| } | ||
|
|
||
| private Schema mapPrimitiveOracleType(String typeName, int precision, int scale, String columnName) { | ||
| switch (typeName) { | ||
| case "TIMESTAMP WITH TZ": | ||
| return isTimestampOldBehavior ? Schema.of(Schema.Type.STRING) : Schema.of(Schema.LogicalType.TIMESTAMP_MICROS); | ||
| case "TIMESTAMP WITH LTZ": | ||
| return getTimestampLtzSchema(); | ||
| case "TIMESTAMP": | ||
| return isTimestampOldBehavior ? | ||
| Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) : Schema.of(Schema.LogicalType.DATETIME); | ||
| case "DATE": | ||
| return Schema.of(Schema.LogicalType.DATE); | ||
| case "TIME": | ||
| return Schema.of(Schema.LogicalType.TIME_MICROS); | ||
| case "BINARY FLOAT": | ||
| case "REAL": | ||
| case "FLOAT": | ||
| return Schema.of(Schema.Type.FLOAT); | ||
| case "BINARY DOUBLE": | ||
| case "DOUBLE": | ||
| return Schema.of(Schema.Type.DOUBLE); | ||
| case "BFILE": | ||
| case "BLOB": | ||
| case "RAW": | ||
| case "LONG RAW": | ||
| return Schema.of(Schema.Type.BYTES); | ||
| case "INTERVAL DAY TO SECOND": | ||
| case "INTERVAL YEAR TO MONTH": | ||
| case "VARCHAR2": | ||
| case "VARCHAR": | ||
| case "CHAR": | ||
| case "CHAR2": | ||
| case "CLOB": | ||
| case "NCLOB": | ||
| case "LONG": | ||
| return Schema.of(Schema.Type.STRING); | ||
| case "INTEGER": | ||
| return Schema.of(Schema.Type.INT); | ||
| case "NUMBER": | ||
| case "DECIMAL": | ||
| if (Double.class.getTypeName().equals(typeName)) { | ||
| return Schema.of(Schema.Type.DOUBLE); | ||
|
Comment on lines
+216
to
+254
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are two issues in
|
||
| } else { | ||
| if (precision == 0) { | ||
| if (isPrecisionlessNumAsDecimal) { | ||
| precision = 38; | ||
| scale = 0; | ||
| LOG.warn(String.format("%s type with undefined precision and scale is detected, " | ||
| + "there may be a precision loss while running the pipeline. " | ||
| + "Please define an output precision and scale for field to avoid " | ||
| + "precision loss.", typeName)); | ||
| return Schema.decimalOf(precision, scale); | ||
| } else { | ||
| LOG.warn(String.format("%s type without precision and scale, " | ||
| + "converting into STRING type to avoid any precision loss.", | ||
| typeName)); | ||
| return Schema.of(Schema.Type.STRING); | ||
| } | ||
| } | ||
| return Schema.decimalOf(precision, scale); | ||
| } | ||
| case "ARRAY": | ||
| case "OTHER": | ||
| case "XML": | ||
| String errorMessage = String.format("Column %s has unsupported SQL type of %s.", columnName, typeName); | ||
| throw ErrorUtils.getProgramFailureException(new ErrorCategory(ErrorCategory.ErrorCategoryEnum.PLUGIN), | ||
| errorMessage, errorMessage, ErrorType.SYSTEM, true, null); | ||
| default: | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| private @NotNull Schema getTimestampLtzSchema() { | ||
| return isTimestampOldBehavior || isTimestampLtzFieldTimestamp | ||
| ? Schema.of(Schema.LogicalType.TIMESTAMP_MICROS) | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CDAP
Schema.recordOfrequires the record name to be a valid identifier (matching[A-Za-z_][A-Za-z0-9_]*). IftypeNameis fully qualified (e.g.,MY_SCHEMA.MY_TYPE), this call will throw anIllegalArgumentExceptiondue to the dot. Consider using only the simple type name part for the record name.