[Bug] [seatunnel-transforms-v2] 解析array<string> 数组时,当数据大于1条,提示空指针异常

by ADMIN 68 views

Introduction

This article reports a bug in the SeaTunnel-transforms-v2 library, specifically when parsing an array of strings. The bug causes a NullPointerException when the data contains more than one element in the array.

Background

SeaTunnel-transforms-v2 is a library for transforming data in Apache SeaTunnel. It provides a set of pre-built transformations that can be used to manipulate data in various ways. One of the transformations is the SQL transformation, which allows users to write SQL queries to transform data.

Problem Statement

The problem occurs when the taxonomies_hierarchical.system.lvl2 field is an array of strings. When the data contains more than one element in the array, the library throws a NullPointerException.

Reproduction Steps

To reproduce the bug, follow these steps:

  1. Create a SeaTunnel job with the following configuration:
source {
  MongoDB {
    plugin_output = "apiList2"
    uri = "***"
    database = "***"
    collection = "********"
    match.query = "{'scrapy_time':'2024-03-14 17:45:39','case_id':1003,'taxonomies_hierarchical.function.lvl2':{$exists:true,$regex : />/ ,$gt:[]}}"
    schema = {
      fields {
        postId = int
        taxonomies_hierarchical = {
          system = {
            lvl0 = "array<string>"
            lvl1 = "array<string>"
            lvl2 = "array<string>"
          }
        }
      }
    }
  }
}

transform {
  Sql {
    plugin_input = "apiList2"
    plugin_output = "apiList23"
    query = "select taxonomies_hierarchical.system.lvl2 as pnames FROM dual LATERAL VIEW EXPLODE ( pnames ) AS NAME"
  }
}
  1. Run the job with the following command:
sh bin/seatunnel.sh -m local --config ./job/livingsystem_parent_to_mysql.conf
  1. Observe the error message in the log file:
java.lang.NullPointerException: null
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLFunction.computeForValue(ZetaSQLFunction.java:307) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.project(ZetaSQLEngine.java:283) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.transformBySQL(ZetaSQLEngine.java:249) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.sql.SQLTransform.transformRow(SQLTransform.java:110) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.sql.SQLTransform.transformRow(SQLTransform.java:46) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform.transform(AbstractSeaTunnelTransform.java:80) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.common.AbstractCatalogSupportFlatMapTransform.flatMap(AbstractCatalogSupportFlatMapTransform.java:44) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.common.AbstractCatalogSupportFlatMapTransform.flatMap(AbstractCatalogSupportFlatMapTransform.java:28) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.common.AbstractMultiCatalogFlatMapTransform.flatMap(AbstractMultiCatalogFlatMapTransform.java:41) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.common.AbstractMultiCatalogFlatMapTransform.flatMap(AbstractMultiCatalogFlatMapTransform.java:28) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.transform(TransformFlowLifeCycle.java:146) ~[seatunnel-starter.jar:2.3.9]
	at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:122) ~[seatunnel-starter.jar:2.3.9]
	at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:43) ~[seatunnel-starter.jar:2.3.9]
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:195) ~[seatunnel-starter.jar:2.3.9]
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:112) ~[seatunnel-starter.jar:2.3.9]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_191]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_191]
	at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_191]

Analysis

The bug occurs because the ZetaSQLFunction class in the ZetaSQLFunction.java file does not handle the case where the taxonomies_hierarchical.system.lvl2 field is an array of strings. When the data contains more than one element in the array, the computeForValue method throws a NullPointerException.

Solution

To fix the bug, we need to modify the ZetaSQLFunction class to handle the case where the taxonomies_hierarchical.system.lvl2 field is an array of strings. We can do this by adding a check to see if the field is an array, and if so, iterate over the elements of the array and process each one separately.

Here is an example of how the modified computeForValue method could look:

public void computeForValue(SeaTunnelRowValues rowValues, SeaTunnelRowValues.Builder builder) {
  // Check if the field is an array
  if (rowValues.getField("taxonomies_hierarchical.system.lvl2") instanceof Array) {
    // Get the array of strings
    Array array = (Array) rowValues.getField("taxonomies_hierarchical.system.lvl2");
    
    // Iterate over the elements of the array
    for (int i = 0; i < array.size(); i++) {
      // Process each element of the array
      processElement(array.get(i), builder);
    }
  } else {
    // Process the field as a single value
    processElement(rowValues.getField("taxonomies_hierarchical.system.lvl2"), builder);
  }
}

Conclusion

In conclusion, the bug in the SeaTunnel-transforms-v2 library occurs because the ZetaSQLFunction class does not handle the case where the taxonomies_hierarchical.system.lvl2 field is an array of strings. To fix the bug, we need to modify the ZetaSQLFunction class to handle this case by iterating over the elements of the array and processing each one separately.

Recommendations

Based on the analysis, we recommend the following:

  1. Modify the ZetaSQLFunction class to handle the case where the taxonomies_hierarchical.system.lvl2 field is an array of strings.
  2. Test the modified code to ensure that it fixes the bug and does not introduce any new issues.

Additional Information

source {
  MongoDB {
    plugin_output = "apiList2"
    uri = "***"
    database = "***"
    collection = "********"
    match.query = "{'scrapy_time':'2024-03-14 17:45:39','case_id':1003,'taxonomies_hierarchical.function.lvl2':{$exists:true,$regex : />/ ,$gt:[]}}"
    schema = {
      fields {
        postId = int
        taxonomies_hierarchical = {
          system = {
            lvl0 = "array<string>"
            lvl1 = "array<string>"
            lvl2 = "array<string>"
          }
        }
      }
    }
  }
}

transform {
  Sql {
    plugin_input = "apiList2"
    plugin_output = "apiList23"
    query = "select taxonomies_hierarchical.system.lvl2 as pnames FROM dual LATERAL VIEW EXPLODE ( pnames ) AS NAME"
  }
}
  • The bug was reproduced using the following command:
sh bin/seatunnel.sh -m local --config ./job/livingsystem_parent_to_mysql.conf
  • The error message was observed in the log file:
java.lang.NullPointerException: null
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLFunction.computeForValue(ZetaSQLFunction.java:307) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.project(ZetaSQLEngine.java:283<br/>
**Q&A: Bug Report - NullPointerException when parsing array<string> in SeaTunnel-transforms-v2**
====================================================================================

**Q: What is the bug in SeaTunnel-transforms-v2?**
------------------------------------------------

A: The bug in SeaTunnel-transforms-v2 occurs when parsing an array of strings. When the data contains more than one element in the array, the library throws a NullPointerException.

**Q: What is the cause of the bug?**
--------------------------------

A: The bug is caused by the `ZetaSQLFunction` class in the `ZetaSQLFunction.java` file not handling the case where the `taxonomies_hierarchical.system.lvl2` field is an array of strings.

**Q: How can the bug be reproduced?**
--------------------------------------

A: The bug can be reproduced by creating a SeaTunnel job with the following configuration:
```conf
source {
  MongoDB {
    plugin_output = "apiList2"
    uri = "***"
    database = "***"
    collection = "********"
    match.query = "{'scrapy_time':'2024-03-14 17:45:39','case_id':1003,'taxonomies_hierarchical.function.lvl2':{$exists:true,$regex : />/ ,$gt:[]}}"
    schema = {
      fields {
        postId = int
        taxonomies_hierarchical = {
          system = {
            lvl0 = "array<string>"
            lvl1 = "array<string>"
            lvl2 = "array<string>"
          }
        }
      }
    }
  }
}

transform {
  Sql {
    plugin_input = "apiList2"
    plugin_output = "apiList23"
    query = "select taxonomies_hierarchical.system.lvl2 as pnames FROM dual LATERAL VIEW EXPLODE ( pnames ) AS NAME"
  }
}

And running the job with the following command:

sh bin/seatunnel.sh -m local --config ./job/livingsystem_parent_to_mysql.conf

Q: What is the error message observed in the log file?

A: The error message observed in the log file is:

java.lang.NullPointerException: null
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLFunction.computeForValue(ZetaSQLFunction.java:307) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.project(ZetaSQLEngine.java:283) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.sql.zeta.ZetaSQLEngine.transformBySQL(ZetaSQLEngine.java:249) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.sql.SQLTransform.transformRow(SQLTransform.java:110) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.sql.SQLTransform.transformRow(SQLTransform.java:46) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform.transform(AbstractSeaTunnelTransform.java:80) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.common.AbstractCatalogSupportFlatMapTransform.flatMap(AbstractCatalogSupportFlatMapTransform.java:44) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.common.AbstractCatalogSupportFlatMapTransform.flatMap(AbstractCatalogSupportFlatMapTransform.java:28) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.common.AbstractMultiCatalogFlatMapTransform.flatMap(AbstractMultiCatalogFlatMapTransform.java:41) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.transform.common.AbstractMultiCatalogFlatMapTransform.flatMap(AbstractMultiCatalogFlatMapTransform.java:28) ~[seatunnel-transforms-v2.jar:2.3.10-SNAPSHOT]
	at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.transform(TransformFlowLifeCycle.java:146) ~[seatunnel-starter.jar:2.3.9]
	at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:122) ~[seatunnel-starter.jar:2.3.9]
	at org.apache.seatunnel.engine.server.task.flow.TransformFlowLifeCycle.received(TransformFlowLifeCycle.java:43) ~[seatunnel-starter.jar:2.3.9]
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.sendRecordToNext(SeaTunnelSourceCollector.java:195) ~[seatunnel-starter.jar:2.3.9]
	at org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector.collect(SeaTunnelSourceCollector.java:112) ~[seatunnel-starter.jar:2.3.9]
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184) ~[?:1.8.0_191]
	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) ~[?:1.8.0_191]
	at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_191]

Q: How can the bug be fixed?

A: The bug can be fixed by modifying the ZetaSQLFunction class to handle the case where the taxonomies_hierarchical.system.lvl2 field is an array of strings. This can be done by adding a check to see if the field is an array, and if so, iterating over the elements of the array and processing each one separately.

Here is an example of how the modified computeForValue method could look:

public void computeForValue(SeaTunnelRowValues rowValues, SeaTunnelRowValues.Builder builder) {
  // Check if the field is an array
  if (rowValues.getField("taxonomies_hierarchical.system.lvl2") instanceof Array) {
    // Get the array of strings
    Array array = (Array) rowValues.getField("taxonomies_hierarchical.system.lvl2");
    
    // Iterate over the elements of the array
    for (int i = 0; i < array.size(); i++) {
      // Process each element of the array
      processElement(array.get(i), builder);
    }
  } else {
    // Process the field as a single value
    processElement(rowValues.getField("taxonomies_hierarchical.system.lvl2"), builder);
  }
}

Q: What is the recommended solution?

A: The recommended solution is to modify the ZetaSQLFunction class to handle the case where the taxonomies_hierarchical.system.lvl2 field is an array of strings. This can be done by adding a check to see if the field is an array, and if so, iterating over the elements of the array and processing each one separately.

Q: What is the next step?

A: The next step is to test the modified code to ensure that it fixes the bug and does not introduce any new issues.