Do one thing just trigger inner join after turning on the flag
spark.conf.set("spark.sql.crossJoin.enabled", "true")
You may also use the cross join.
weights.crossJoin(input)
or set the Alias as
weights.join(input, input("sourceId")===weights("sourceId"), "cross")
This is an issue for your version of Spark but i guess it is fixed now.
But I would suggest you to 2.1.1, you won’t encounter any such issue here.