Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.NullPointerException while showing predicted output. #202

Open
aman1931998 opened this issue Apr 27, 2022 · 6 comments
Open

java.lang.NullPointerException while showing predicted output. #202

aman1931998 opened this issue Apr 27, 2022 · 6 comments

Comments

@aman1931998
Copy link

Dear @maxpumperla / other authors of this repo. A big thanks for developing this library, I have been successful in running it some datasets but I am facing issue with the current one. Please help me though it.

Dataset: https://www.kaggle.com/datasets/janiobachmann/bank-marketing-dataset

Dataset looks like this: df.show()
image

Schema: df.printSchema() [deposit being the target variable]
image

Dataset doesn't have any null values
image

After Converting categorical columns.
image

After Converting numerical columns via VectorAssembler -> StandardScaler.
image

I thought of converting the Vector created into individual columns hence exploded the Vector column.
image

Then I converted all the features into a single Vector to create 'features' column.
image

But you can see that some are SparseVector and some are DenseVector. Normally, pyspark's functionality is not affected but since the issue I was facing was not getting resolved, hence I forcefully converted each SparseVector to DenseVector.
image

With all this done, I converted the target variable ('deposit') via StringIndexer as well and took the features and labels columns to a separate df.
image

Keras Model:
image

Elephas estimator config:
image

Training via elephas and output df (pred_test).
image

Error Stack when running pred_test.collect()
image

Full Error Stack

Py4JJavaError Traceback (most recent call last)
Input In [2718], in <cell line: 1>()
----> 1 pred_test.collect()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py:693, in DataFrame.collect(self)
683 """Returns all the records as a list of :class:Row.
684
685 .. versionadded:: 1.3.0
(...)
690 [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
691 """
692 with SCCallSiteSync(self._sc) as css:
--> 693 sock_info = self._jdf.collectToPython()
694 return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))

File /opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py:1321, in JavaMember.call(self, *args)
1315 command = proto.CALL_COMMAND_NAME +
1316 self.command_header +
1317 args_command +
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()

File /opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py:111, in capture_sql_exception..deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)

File /opt/spark/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o17843.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1258.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1258.0 (TID 985) (.internal executor driver): java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException
newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize
at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1047)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$scalaConverter$2(ScalaUDF.scala:164)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:434)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:269)
Caused by: java.lang.NullPointerException

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279)
at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:394)
at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3538)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3535)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException
newInstance(class org.apache.spark.ml.linalg.VectorUDT).deserialize
at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1047)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
at org.apache.spark.sql.catalyst.expressions.ScalaUDF.$anonfun$scalaConverter$2(ScalaUDF.scala:164)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1214)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:53)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:434)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:269)
Caused by: java.lang.NullPointerException

Please help me out in this. Thanks in advance.!

@aman1931998
Copy link
Author

aman1931998 commented Apr 27, 2022

Environment Details:
Zeppelin 0.10.1
Spark 3.2.1 for hadoop 3.2

Python 3.8.10
PySpark 3.2.1
Elephas 3.1.0
tensorflow 2.8.0
keras 2.8.0

No. of Cores: 2
Memory: 8GB

@shahryary
Copy link

shahryary commented Jun 2, 2022

Same error for me, I worked a little bit more into model and I realize that the problem could be happen in pyspark serializers, here is the error:

22/06/02 15:56:24 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 599, in main
    eval_type = read_int(infile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:545)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:703)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:685)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at org.apache.spark.rdd.RDD$$anon$3.hasNext(RDD.scala:951)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.rdd.RDD$$anon$3.foreach(RDD.scala:950)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:307)
	at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:670)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:424)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2019)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:259)
Caused by: java.lang.NullPointerException
22/06/02 15:56:24 ERROR PythonRunner: This may have been caused by a prior exception:
java.lang.NullPointerException
22/06/02 15:56:24 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 599, in main
    eval_type = read_int(infile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError

Probably updating pyspark( current version 3.2.0) into new one (3.2.1) could solve the problem.

Additionally, somehow in "transform" part the element are array (to see use: predcition.printSchema(), despite the other ML algorithms just giving none-array result, so probably in the array format we couldn't retrieve the data in prediction part

I'll work on it, will update here..

@danielenricocahall
Copy link
Collaborator

Hey were there any updates on this front?

@albarabimakasa
Copy link

hello every one, I have same problem when i want to show my prediction it's says that

Py4JJavaError: An error occurred while calling o1560.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 79.0 failed 1 times, most recent failure: Lost task 0.0 in stage 79.0 (TID 87) (849282bebc33 executor driver): java.lang.IllegalArgumentException: requirement failed: Index 4 out of bounds for vector of size 4

i don't know what even it's mean, so here my code i hope somebody help me.

from elephas.ml_model import ElephasEstimator
from tensorflow.keras import optimizers
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics


adam = optimizers.Adam(learning_rate=0.1)
opt_conf = optimizers.serialize(adam


estimator = ElephasEstimator()
estimator.setFeaturesCol("scaled_features")            
estimator.setLabelCol("index_category")              
estimator.set_keras_model_config(model.to_json())      l
estimator.set_categorical_labels(True)
estimator.set_nb_classes(nb_classes)
estimator.set_num_workers(3)  # We just use one worker here. Feel free to adapt it.
estimator.set_epochs(5) 
estimator.set_batch_size(1280)
estimator.set_verbosity(1)
estimator.set_validation_split(0.15)
estimator.set_optimizer_config(opt_conf)
estimator.set_mode("synchronous")
estimator.set_loss("categorical_crossentropy")
estimator.set_metrics(['acc'])



pipeline = Pipeline(stages=[string_indexer, scaler, estimator])
fitted_pipeline = pipeline.fit(train_df)
prediction = fitted_pipeline.transform(test_df) # <-- The same code evaluates test data.
pnl = prediction.select("Fvec", "prediction")
pnl.show(100)

@AlessandroMinervini
Copy link

I have the same problem, the model fit and transform correctly but when i try to show the predictions values my script crashes with the same error...

Any solutions?

Thanks

@danielenricocahall
Copy link
Collaborator

Hello! It looks like they may be separate issues. Would be happy to look into it:

  1. Please post the issues in the new fork: https://github.com/danielenricocahall/elephas
  2. Please post a fully reproducible example (if possible).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants