Hi all. I am facing some issues testing out pyspar...
# general
c
Hi all. I am facing some issues testing out pyspark with pants. Running a pyspark job via
./pants run
gives
ModuleNotFoundError: No module named 'pandas'
. here is a repo with the issue: https://github.com/adityav/pants-python-tryouts
trying to run job via
./pants run helloworld/sparkjob/hellospark.py
pandas is in the dependency tree.
Copy code
./pants dependencies --transitive helloworld/sparkjob/hellospark.py
//:reqs#pandas
//:reqs#pyspark
//requirements.txt:reqs
Copy code
โžœ ./pants run helloworld/sparkjob/hellospark.py                      
22:02:37.90 [INFO] Initializing scheduler...
22:02:38.22 [INFO] Scheduler initialized.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/01/22 22:02:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/01/22 22:02:55 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 16) 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/avishwakarma/.cache/pants/named_caches/pex_root/installed_wheels/878b260bb4d3ee05745c118426887764855ed824d3fd63fe5648c52326d8d32e/pyspark-3.3.1-py2.py3-none-any.whl/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 670, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/Users/avishwakarma/.cache/pants/named_caches/pex_root/installed_wheels/878b260bb4d3ee05745c118426887764855ed824d3fd63fe5648c52326d8d32e/pyspark-3.3.1-py2.py3-none-any.whl/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 507, in read_udfs
    udfs.append(read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index=i))
  File "/Users/avishwakarma/.cache/pants/named_caches/pex_root/installed_wheels/878b260bb4d3ee05745c118426887764855ed824d3fd63fe5648c52326d8d32e/pyspark-3.3.1-py2.py3-none-any.whl/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 289, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/Users/avishwakarma/.cache/pants/named_caches/pex_root/installed_wheels/878b260bb4d3ee05745c118426887764855ed824d3fd63fe5648c52326d8d32e/pyspark-3.3.1-py2.py3-none-any.whl/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/Users/avishwakarma/.cache/pants/named_caches/pex_root/installed_wheels/878b260bb4d3ee05745c118426887764855ed824d3fd63fe5648c52326d8d32e/pyspark-3.3.1-py2.py3-none-any.whl/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/avishwakarma/.cache/pants/named_caches/pex_root/installed_wheels/878b260bb4d3ee05745c118426887764855ed824d3fd63fe5648c52326d8d32e/pyspark-3.3.1-py2.py3-none-any.whl/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'pandas'
e
When you need maximum compatibility or any form of control, running a `python_sources`owned file is not what you want. That must necessarily choose one set of options for all runs. Here the option you needed was to tell Pants to tell Pex to use a venv (See: https://www.pantsbuild.org/docs/reference-pex_binary#codeexecution_modecode) https://github.com/adityav/pants-python-tryouts/pull/1
c
Thanks for looking at this.๐Ÿ‘ how did you figure out that โ€œvenvโ€ execution mode will do the trick? Would love to learn Also, what are the options for testing? there is a test file in there as well, which has similar issue.
helloworld/sparkjob/hellospark_test.py
e
I am the Pex maintainer and I added its venv support a few years ago and then that option to use it in Pants. I won't be back at a keyboard to check the test issue for a while, but you have more info now so maybe you'll be able to dig and figure it out.
c
Nice! So good news and bad news. After researching about this a bit, and searching this channel. I was able to get it working another way by adding pyspark specific env variables:
Copy code
if __name__ == "__main__":
    os.environ['PYSPARK_PYTHON'] = sys.executable
    os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
    hello_spark(SparkSession.builder.getOrCreate())
Have no idea why it works, but it does. Unfortunately, ran into same issue when I added a constraints file. Going to create another thread for it