SparkRDDからIgniteキャッシュへの挿入を実行しようとしています。Igniteのバージョン2.2とSparkの2.1を使用しています。
私が取る最初のステップは、次のように、別のscalaスクリプトでキャッシュを作成することです。
object Create_Ignite_Cache {
case class Custom_Class(
@(QuerySqlField @field)(index = true) a: String,
@(QuerySqlField @field)(index = true) b: String,
@(QuerySqlField @field)(index = true) c: String,
@(QuerySqlField @field)(index = true) d: String,
@(QuerySqlField @field)(index = true) e: String,
@(QuerySqlField @field)(index = true) f: String,
@(QuerySqlField @field)(index = true) g: String,
@(QuerySqlField @field)(index = true) h: String
)
def main(args: Array[String]): Unit = {
val spi = new TcpDiscoverySpi
val ipFinder = new TcpDiscoveryMulticastIpFinder
val adresses = new util.ArrayList[String]
adresses.add("127.0.0.1:48500..48520")
ipFinder.setAddresses(adresses)
spi.setIpFinder(ipFinder)
val cfg = new IgniteConfiguration().setDiscoverySpi(spi).setClientMode(true)
val cache_conf = new CacheConfiguration[String, Custom_Class]().setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(1).setIndexedTypes(classOf[String], classOf[Custom_Class]).setName("Spark_Ignite")
val ignite = Ignition.getOrStart(cfg)
ignite.getOrCreateCache(cache_conf)
System.out.println("[INFO] CACHE CREATED")
ignite.close()
}
}
ignitevisorからわかるように、キャッシュは正常に作成されます。
次に、Sparkアプリを実行して、igniteRDDのコンテンツをキャッシュに挿入しました。
object Spark_Streaming_Processing {
case class Custom_Class(
@(QuerySqlField @field)(index = true) a: String,
@(QuerySqlField @field)(index = true) b: String,
@(QuerySqlField @field)(index = true) c: String,
@(QuerySqlField @field)(index = true) d: String,
@(QuerySqlField @field)(index = true) e: String,
@(QuerySqlField @field)(index = true) f: String,
@(QuerySqlField @field)(index = true) g: String,
@(QuerySqlField @field)(index = true) h: String
)
//START IGNITE CONTEXT
val addresses=new util.ArrayList[String]()
addresses.add("127.0.0.1:48500..48520")
val igniteContext:IgniteContext=new IgniteContext(sqlContext.sparkContext,()=>
new IgniteConfiguration().setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(addresses))
).setCacheConfiguration(new CacheConfiguration[String,Custom_Class]()
.setName("Spark_Ignite").setBackups(1).setIndexedTypes(classOf[String],classOf[Custom_Class]))
,true)
println(igniteContext.ignite().cacheNames())
val ignite_cache_rdd:IgniteRDD[String,Custom_Class] =igniteContext.fromCache[String,Custom_Class]("Spark_Ignite")
val processed_Pair:RDD[(String,Custom_Class)]=(...)// rdd with data, which as you can see has the correct datatypes as parameters
ignite_cache_rdd.savePairs(processed_PairRDD)
}
}
ご覧のとおり、クラスは完全に同一です。
アプリを正常に実行した後、コンソールの前のスクリーショットで見られるように、キャッシュに63レコードが含まれていることがignitevisorで確認できます。
ただし、次のように、キャッシュに対してSQLクエリを実行しようとすると、次のようになります。
ignite_cache_rdd.sql("select * from Custom_Class").show(truncate = false)
その結果、空のテーブルが表示されます。
外部SQLサーバーを介してクエリを実行した場合も同じことが起こります。
私はキャッシュ演繹作成し、スパークアプリを実行しない場合、それが存在するdoesntの場合は不思議なことに、IgniteContextは、キャッシュを作成し、THEN私は私のクエリでレコードを表示することができています!
ここで何が問題になるのでしょうか?
キーと値の両方のデータ型がまったく同じであることがわかる限り、クエリを実行するとそれらを確認できるはずです。
お時間をいただきありがとうございます。
ここでの問題は、異なるクラスを使用してキャッシュを作成し、それにデータを挿入することです。これら2つのクラスのフィールドは一致しますが、完全修飾名は異なるため、これらは2つの異なるクラスです。
SQLからデータをクエリできるようにする場合は、キャッシュの作成時とデータの挿入時に同じクラスを使用する必要があります。
キャッシュの作成をスキップすることで問題が解決する理由は、Sparkアプリが既存のキャッシュを使用する代わりにキャッシュ自体を作成するためです。したがって、Sparkがそれを作成するとき、実際のオブジェクトのクラスがキャッシュの作成中に使用されます。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加