我正在使用 Java 对我的第一个 Spark/Cassandra 程序做一些愚蠢的事情,希望有人能帮我弄清楚为什么我会收到这个错误:
: com.datastax.driver.core.exceptions.SyntaxError: line 1:8 no viable alternative at input 'FROM' (SELECT [FROM]...)
设置是
Create keyspace test with replicaton={'class':strategy name,
'replication_factor': No of replications on different nodes}
CREATE KEYSPACE test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
use test;
CREATE TABLE KeyValue ( key varchar, value bigint, PRIMARY KEY (key));
INSERT INTO KeyValue (key, value) VALUES ('afoo', 100);
代码(下面)很简单……我使用了“选择”子句,所以我不确定为什么驱动程序没有选择我指定的列。
import com.datastax.spark.connector.cql.CassandraConnector;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
public class JavaDemo {
public static void main(String[] args) throws Exception {
String sparkMaster = "local[2]";
String cassandraHost = "localhost";
SparkConf conf = new SparkConf(true)
.set("spark.cassandra.connection.host", cassandraHost);
JavaSparkContext sc = new JavaSparkContext(sparkMaster, "basicquerycassandra", conf);
CassandraConnector connector = CassandraConnector.apply(conf);
JavaRDD<KeyValue> rdd = javaFunctions(sc)
.cassandraTable("test", "keyvalue", mapRowTo(KeyValue.class))
.withConnector(connector).select("key", "value")
.where("key = 'afoo'");
rdd.foreach(row -> System.out.println("got item" + row));
}
public static class KeyValue implements Serializable {
private String key;
private Integer value;
public KeyValue() {
}
public static KeyValue newInstance(String k, Integer v) {
KeyValue kv = new KeyValue();
kv.setKey(k);
kv.setValue(v);
return kv;
}
public String getKey() {
return key;
}
public Integer getValue() {
return value;
}
void setKey(String k) {
this.key = k;
}
void setValue(Integer v) {
this.value = v;
}
@Override
public String toString() {
return "KeyValue{" +
"key='" + key + '\'' +
", value=" + value +
'}';
}
}
}
更新:如果我更新代码如下,我可以避免语法错误......这还不是我想要的。明天我会摆弄它,如果没有人打败我,我会发布一个答案。我很接近;^)
JavaSparkContext sc = new JavaSparkContext(sparkMaster, "basicquerycassandra", conf);
CassandraConnector connector = CassandraConnector.apply(conf);
CassandraTableScanJavaRDD<CassandraRow> rdd = javaFunctions(sc)
.cassandraTable("test", "keyvalue")
.select("key", "value")
.where("key = 'afoo'");
rdd.foreach(row -> System.out.println("got item" + row));
由于您没有提供任何列映射器,spark 连接器将使用默认值,JavaBeanColumnMapper
例如基本上它是普通的 java 反射,如来自 apache 的 BeanUtils。这将需要所有选定列的公共构造函数或公共 getter/setter。
所以为了让它工作,你应该有像下面这样的公共构造函数,或者将 setter/getter 方法定义为 publickey
属性。希望能帮助到你。
公共构造函数:
public KeyValue(String key, Integer value) {
this.key = key;
this.value = value;
}
公共设置器/获取器:
public void setKey(String key) {
this.key = key;
}
public void setValue(Integer value) {
this.value = value;
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系 [email protected] 删除。
我来说两句