Beim Testen auf einen Produktionsanwendungsfall habe ich folgende Tabellen erstellt und gespeichert (mithilfe von Hive Metastore):

table1:
fields: key1, key2, value1
sortedBy key1,key2
bucketBy: key1, 100 buckets

table2:
fields: key1, key2, value2
sortedBy: key1,key2
bucketBy: key1, 100 buckets

Ich führe eine solche Abfrage aus (im Pseudocode)

table1.join(table2, [“key1”, “key2”])
 .groupBy(“value2”)
 .countUnique(“key1”)

Der gesunde Menschenverstand sagt, dass dieser Join einfach mit einem Sort-Merge-Join ohne Austausch durchgeführt werden sollte. Jedoch macht Funke einen Austausch dann beitreten.

Obwohl ich für diesen speziellen Anwendungsfall beide Schlüssel hätte verwenden können, muss ich aufgrund einiger anderer Anwendungsfälle nach Schlüssel1 suchen. Und wenn ich einen (einfacheren) Join mit einem einzigen Schlüssel wie diesem mache:

table1.join(table2, [“key1”])

Es funktioniert wie erwartet (d. H. Sort-Merge-Join ohne Austausch).

Jetzt, da ich einen optimierten Join für diese Tabelle habe, wenn ich als solchen filtern möchte:

table1.join(table2, [“key1”])
 .filter(table1.col(“key2”) == table2.col(“key2”))

Es kehrt zum Austausch zurück und tritt dann bei.

Wie kann ich spark davon überzeugen, keinen Austausch durchzuführen, wenn der Join-Schlüssel ein Super-Set des BucketBy-Schlüssels ist?

Hinweis:

Ein Trick, den ich kenne, ist, dass anstelle einer Gleichheitsprüfung, wenn ich als Ungleichheitsprüfung umschreiben würde, der Funke nicht mischen würde.

(x == y) kann auch ausgedrückt werden als ((x> = y) & (x <= y)). Wenn ich im letzten Beispiel zwei Filter wie diesen anwenden würde:

.filter (table1.col ("key2")> = table2.col ("key2"))

.filter (table1.col ("key2") <= table2.col ("key2"))

Es wird weiterhin Sort-Merge-Join ohne Austausch verwenden. Dies ist jedoch keine Lösung, sondern ein Hack.

8
zetaprime 26 Juni 2019 im 02:53

3 Antworten

Beste Antwort

Basierend auf einigen Forschungen und Erkundungen scheint dies die am wenigsten hackige Lösung zu sein:

Aufbauend auf diesem Beispiel:

table1.join(table2, [“key1”])
      .filter(table1.col(“key2”) == table2.col(“key2”))

Anstatt das equalTo (==) von Spark zu verwenden, scheint die Implementierung eines benutzerdefinierten MyEqualTo (durch Delegieren an die Spark EqualTo -Implementierung ist in Ordnung) das Problem zu lösen. Auf diese Weise optimiert spark den Join nicht (!) Und zieht den Filter einfach in SortMergeJoin nach oben.

In ähnlicher Weise kann die Verbindungsbedingung auch als solche gebildet werden:

(table1.col(“key1”) == table2.col(“key1”)) AND
table1.col(“key2”).myEqualTo(table2.col(“key2”))
3
zetaprime 24 Juli 2019 im 16:17

org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin ist die Optimierungsregel, die das Prädikat durch den Join schiebt. ~~
Wir können diese Regel von den Optimierungsregeln ausschließen. Auf diese Weise müssen wir keine Änderungen am Benutzercode vornehmen.
Um dies auszuschließen, können wir einen der folgenden Schritte ausführen:
1. --conf spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin.
2. Fügen Sie die Eigenschaft in der Spark-Standardeinstellung .conf hinzu.
3. Fügen Sie dem Benutzercode set spark.sql.optimizer.excludedRules=org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin hinzu.

Auch dies ist wieder ein Hack. .
Idealerweise sollten die Filter durch die Verknüpfung nach unten gedrückt werden, wodurch die Anzahl der zu verbindenden Zeilen verringert wird

Update: .
1. Ich habe mich beim Pushdown geirrt. Es wird kein Filter-Pushdown durchgeführt, da das Prädikat Spalten aus beiden Tabellen enthält.
2. Warum fügt SortMergeJoin (SMJ) keinen zusätzlichen Austausch hinzu, wenn die where -Klausel ein Prädikat "Nichtgleichheit" enthält?
Dies liegt daran, dass SMJ nur gleichheitsbasierte Prädikate als Teil der Join-Bedingung berücksichtigen kann. org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys # unapply

Und EnsureRequirements, das für das Hinzufügen des Austauschs verantwortlich ist, stellt fest, dass SMJ keine neue Join-Bedingung hat und die Ausgabeverteilung bereits erfüllt ist.
Code: org.apache.spark.sql.execution.exchange.EnsureRequirements # sureDistributionAndOrdering.
3. Was ist effizient - Hinzufügen einer UDF, die gleich ist oder das Prädikat als Kombination von größer als und kleiner als darstellt? .
Um dies zu bewerten, überprüfte ich den generierten Code mit:

val df = spark.sql(<joinquery>)
df.queryExecution.debug.codegen

Ein. UDF gleich - beinhaltet zusätzlichen Aufwand für virtuelle Funktionsaufrufe.
b. Kombination von kleiner als und größer als - keine virtuellen Funktionsaufrufe. Sobald wir eine verknüpfte Zeile gefunden haben (mit Schlüssel1), sucht der Code nacheinander nach den anderen Prädikaten.

Aus den obigen Beobachtungen in 3 scheint die Verwendung eines nicht auf Gleichheit basierenden Prädikats effizienter zu sein.

0
DaRkMaN 25 Juli 2019 im 06:04

** basierend auf Ihrem Pseudocode **

Table1.join (table2, ["key1", "key2"]) .groupBy ("value2") .countUnique ("key1")

Ich denke die Lösung wäre

Als ersten Schritt verbinden Sie einfach die Tabellen und erhalten den Datenrahmen.

df = table1.join(table2, [“key1”, “key2”])

dann gruppieren nach und machen unterschiedliche Zählungen

df.select(“value2”,“key1”).distinct().groupBy(“value2”,“key1”).count().show()
0
Suresh Chaganti 25 Juli 2019 im 20:25