java spark cassandra 小程序给出了 datastax 驱动程序异常:SyntaxError:在输入“FROM”时没有可行的替代方案

克里斯贝德福德

我正在使用 Java 对我的第一个 Spark/Cassandra 程序做一些愚蠢的事情,希望有人能帮我弄清楚为什么我会收到这个错误:

: com.datastax.driver.core.exceptions.SyntaxError: line 1:8 no viable alternative at input 'FROM' (SELECT  [FROM]...)

设置是

Create keyspace test with replicaton={'class':strategy name, 
                'replication_factor': No of replications on different nodes}
CREATE KEYSPACE test WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };
use test;
CREATE TABLE KeyValue ( key varchar, value bigint, PRIMARY KEY (key));
INSERT INTO KeyValue (key, value) VALUES ('afoo', 100);

代码(下面)很简单……我使用了“选择”子句,所以我不确定为什么驱动程序没有选择我指定的列。

import com.datastax.spark.connector.cql.CassandraConnector;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

import java.io.Serializable;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;

public class JavaDemo {
  public static void main(String[] args) throws Exception {
    String sparkMaster = "local[2]";
    String cassandraHost = "localhost";
    SparkConf conf = new SparkConf(true)
            .set("spark.cassandra.connection.host", cassandraHost);

    JavaSparkContext sc = new JavaSparkContext(sparkMaster, "basicquerycassandra", conf);
    CassandraConnector connector = CassandraConnector.apply(conf);
    JavaRDD<KeyValue> rdd = javaFunctions(sc)
            .cassandraTable("test", "keyvalue", mapRowTo(KeyValue.class))
            .withConnector(connector).select("key", "value")
            .where("key = 'afoo'");

    rdd.foreach(row -> System.out.println("got item" + row));
  }

  public static class KeyValue implements Serializable {
    private String key;
    private Integer value;
    public KeyValue() {
    }
    public static KeyValue newInstance(String k, Integer v) {
      KeyValue kv = new KeyValue();
      kv.setKey(k);
      kv.setValue(v);
      return kv;
    }
    public String getKey() {
      return key;
    }
    public Integer getValue() {
      return value;
    }
    void setKey(String k) {
      this.key = k;
    }
    void setValue(Integer v) {
      this.value = v;
    }

    @Override
    public String toString() {
      return "KeyValue{" +
              "key='" + key + '\'' +
              ", value=" + value +
              '}';
    }
  }
}

更新:如果我更新代码如下,我可以避免语法错误......这还不是我想要的。明天我会摆弄它,如果没有人打败我,我会发布一个答案。我很接近;^)

    JavaSparkContext sc = new JavaSparkContext(sparkMaster, "basicquerycassandra", conf);
    CassandraConnector connector = CassandraConnector.apply(conf);
    CassandraTableScanJavaRDD<CassandraRow> rdd = javaFunctions(sc)
            .cassandraTable("test", "keyvalue")
            .select("key", "value")
            .where("key = 'afoo'");

    rdd.foreach(row -> System.out.println("got item" + row));
赛博拉斯

由于您没有提供任何列映射器,spark 连接器将使用默认值,JavaBeanColumnMapper例如基本上它是普通的 java 反射,如来自 apache 的 BeanUtils。这将需要所有选定列的公共构造函数或公共 getter/setter。

所以为了让它工作,你应该有像下面这样的公共构造函数,或者将 setter/getter 方法定义为 publickey属性。希望能帮助到你。

公共构造函数:

    public KeyValue(String key, Integer value) {
        this.key = key;
        this.value = value;
    }

公共设置器/获取器:

    public void setKey(String key) {
        this.key = key;
    }

    public void setValue(Integer value) {
        this.value = value;
    }

本文收集自互联网,转载请注明来源。

如有侵权,请联系 [email protected] 删除。

编辑于
0

我来说两句

0 条评论
登录 后参与评论

相关文章