Answered Unable to connect to spark
You can't set spark.cassandra.connection.host to localhost because for the Spark application it will be its own container, while you have Cassandra running in another container. You need to use Cassandra container...
You can't set spark.cassandra.connection.host to localhost because for the Spark application it will be its own container, while you have Cassandra running in another container. You need to use Cassandra container name instead.
commented Pull Request #2556 on apache/cassandra
I have noticed that this test returns more rows than expected: java @Test public void notInTest() { createTable("CREATE TABLE %s (k int PRIMARY KEY, v1 int, v2 int)"); createIndex("CREATE INDEX ON %s(v1) USING 'sai'"); ...
I have noticed that this test returns more rows than expected: java @Test public void notInTest() { createTable("CREATE TABLE %s (k int PRIMARY KEY, v1 int, v2 int)"); createIndex("CREATE INDEX ON %s(v1) USING 'sai'"); createIndex("CREATE INDEX ON %s(v2) USING 'sai'");
execute("INSERT INTO %s (k, v1, v2) VALUES (1, 1, 4)");
execute("INSERT INTO %s (k, v1, v2) VALUES (2, 2, 3)");
execute("INSERT INTO %s (k, v1, v2) VALUES (3, 3, 2)");
execute("INSERT INTO %s (k, v1, v2) VALUES (4, 4, 1)");
waitForTableIndexesQueryable(currentTable());
assertRowsIgnoringOrder(execute("SELECT * FROM %s"),
row(1, 1, 4),
row(2, 2, 3),
row(3, 3, 2),
row(4, 4, 1));
assertRowsIgnoringOrder(execute("SELECT * FROM %s WHERE v1 NOT IN (1)"),
row(2, 2, 3),
row(3, 3, 2),
row(4, 4, 1));
assertRowsIgnoringOrder(execute("SELECT * FROM %s WHERE v1 NOT IN (4)"),
row(1, 1, 4),
row(2, 2, 3),
row(3, 3, 2));
assertRowsIgnoringOrder(execute("SELECT * FROM %s WHERE v1 NOT IN (1, 4)"),
row(2, 2, 3),
row(3, 3, 2)); // Got 1 extra row(s) in result: (k=1, v1=1, v2=4)
assertRowsIgnoringOrder(execute("SELECT * FROM %s WHERE v1 NOT IN (1, 4) AND v2 NOT IN (3, 2)")); // 2 extra rows
}
However, the test passes if we don't create the indexes and useALLOW FILTERING instead.
commented Pull Request #2556 on apache/cassandra
@pkolaczk there is a bunch of unresolved nits above that only show up if you click on the 12 hidden conversations Load more... link on the GH web UI, maybe you have missed them?
Running the Python code does not connect to Spark and does not create a database in Cassandra either. I have confirmed the services are up on docker and accessible from the PC. I placed the .jar files in the pyspark jar folder of my .env.
...Running the Python code does not connect to Spark and does not create a database in Cassandra either. I have confirmed the services are up on docker and accessible from the PC. I placed the .jar files in the pyspark jar folder of my .env.
I have tried:
- Google and ChaGTP.
- changed cassandra host name from
localhostto docker hostnamecassandra. - added:
.master("spark://spark-master:7077")\ tos_con. - Confirmed that
spark_conn = create_spark_connection()is not completing.if spark_conn is not None:never starts.
Error:
ERROR SparkContext: Error initializing SparkContext.
ERROR:root:Couldn't create the spark session due to exception An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
spark_stream.py:
import logging
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
logging.basicConfig(level=logging.INFO)
def create_keyspace(session):
session.execute("""
CREATE KEYSPACE IF NOT EXISTS spark_streams
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
""")
print("Keyspace created successfully!")
def create_table(session):
session.execute("""
CREATE TABLE IF NOT EXISTS spark_streams.created_users (
id UUID PRIMARY KEY,
first_name TEXT,
last_name TEXT,
gender TEXT,
address TEXT,
post_code TEXT,
email TEXT,
username TEXT,
registered_date TEXT,
phone TEXT,
picture TEXT);
""")
print("Table created successfully!")
def insert_data(session, **kwargs):
print("inserting data...")
user_id = kwargs.get('id')
first_name = kwargs.get('first_name')
last_name = kwargs.get('last_name')
gender = kwargs.get('gender')
address = kwargs.get('address')
postcode = kwargs.get('post_code')
email = kwargs.get('email')
username = kwargs.get('username')
dob = kwargs.get('dob')
registered_date = kwargs.get('registered_date')
phone = kwargs.get('phone')
picture = kwargs.get('picture')
try:
session.execute("""
INSERT INTO spark_streams.created_users(id, first_name, last_name, gender, address,
post_code, email, username, dob, registered_date, phone, picture)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (user_id, first_name, last_name, gender, address,
postcode, email, username, dob, registered_date, phone, picture))
logging.info(f"Data inserted for {first_name} {last_name}")
except Exception as e:
logging.error(f'could not insert data due to {e}')
def create_spark_connection():
s_conn = None
try:
s_conn = SparkSession.builder \
.appName('SparkDataStreaming') \
.config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.13:3.4.1,"
"org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.1") \
.config('spark.cassandra.connection.host', 'localhost') \
.getOrCreate()
s_conn.sparkContext.setLogLevel("INFO")
logging.info("Spark connection created successfully!")
except Exception as e:
logging.error(f"Couldn't create the spark session due to exception {e}")
return s_conn
def connect_to_kafka(spark_conn):
spark_df = None
try:
spark_df = spark_conn.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', 'localhost:9092') \
.option('subscribe', 'users_created') \
.option('startingOffsets', 'earliest') \
.load()
logging.info("kafka dataframe created successfully")
except Exception as e:
logging.warning(f"kafka dataframe could not be created because: {e}")
return spark_df
def create_cassandra_connection():
try:
# connecting to the cassandra cluster
cluster = Cluster(['localhost'])
return cluster.connect()
except Exception as e:
logging.error(f"Could not create cassandra connection due to {e}")
return None
def create_selection_df_from_kafka(spark_df):
schema = StructType([
StructField("id", StringType(), False),
StructField("first_name", StringType(), False),
StructField("last_name", StringType(), False),
StructField("gender", StringType(), False),
StructField("address", StringType(), False),
StructField("post_code", StringType(), False),
StructField("email", StringType(), False),
StructField("username", StringType(), False),
StructField("registered_date", StringType(), False),
StructField("phone", StringType(), False),
StructField("picture", StringType(), False)
])
sel = spark_df.selectExpr("CAST(value AS STRING)") \
.select(from_json(col('value'), schema).alias('data')).select("data.*")
print(sel)
return sel
if __name__ == "__main__":
# create spark connection
spark_conn = create_spark_connection()
if spark_conn is not None:
# connect to kafka with spark connection
spark_df = connect_to_kafka(spark_conn)
selection_df = create_selection_df_from_kafka(spark_df)
session = create_cassandra_connection()
if session is not None:
create_keyspace(session)
create_table(session)
logging.info("Streaming is being started...")
streaming_query = (selection_df.writeStream.format("org.apache.spark.sql.cassandra")
.option('checkpointLocation', '/tmp/checkpoint')
.option('keyspace', 'spark_streams')
.option('table', 'created_users')
.start())
streaming_query.awaitTermination()
docker_compose:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
container_name: zookeeper
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: ['CMD', 'bash', '-c', "echo 'ruok' | nc localhost 2181"]
interval: 10s
timeout: 5s
retries: 5
networks:
- confluent
broker:
image: confluentinc/cp-server:7.4.0
container_name: broker
hostname: broker
depends_on:
zookeeper:
condition: service_healthy
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://spark-master:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'false'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
networks:
- confluent
healthcheck:
test: [ "CMD", "bash", "-c", 'nc -z localhost 9092' ]
interval: 10s
timeout: 5s
retries: 5
schema-registry:
image: confluentinc/cp-schema-registry:7.4.0
container_name: schema-registry
hostname: schema-registry
depends_on:
broker:
condition: service_healthy
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
networks:
- confluent
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:8081/" ]
interval: 30s
timeout: 10s
retries: 5
control-center:
image: confluentinc/cp-enterprise-control-center:7.4.0
hostname: control-center
container_name: control-center
depends_on:
broker:
condition: service_healthy
schema-registry:
condition: service_healthy
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
CONFLIENT_METRICS_ENABLE: 'false'
PORT: 9021
networks:
- confluent
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:9021/health" ]
interval: 30s
timeout: 10s
retries: 5
webserver:
image: apache/airflow:2.6.0-python3.9
container_name: af-webserver
hostname: af-webserver
command: webserver
entrypoint: ['/opt/airflow/script/entrypoint.sh']
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Sequential
- AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres:5432/airflow
- AIRFLOW_WEBSERVER_SECRET_KEY=this_is_a_very_secured_key
logging:
options:
max-size: 10m
max-file: "3"
volumes:
- ./dags:/opt/airflow/dags
- ./script/entrypoint.sh:/opt/airflow/script/entrypoint.sh
- ./requirements.txt:/opt/airflow/requirements.txt
ports:
- "8080:8080"
healthcheck:
test: ['CMD-SHELL', "[ -f /opt/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
networks:
- confluent
scheduler:
image: apache/airflow:2.6.0-python3.9
container_name: af-scheduler
hostname: af-scheduler
depends_on:
webserver:
condition: service_healthy
volumes:
- ./dags:/opt/airflow/dags
- ./script/entrypoint.sh:/opt/airflow/script/entrypoint.sh
- ./requirements.txt:/opt/airflow/requirements.txt
env_file:
- airflow.env
environment:
- LOAD_EX=n
- EXECUTOR=Sequential
command: bash -c "pip install -r ./requirements.txt && airflow db upgrade && airflow scheduler"
networks:
- confluent
postgres:
image: postgres:14.0
container_name: af-postgres
hostname: af-postgres
env_file:
- postgres.env
environment:
- POSTGRES_DB=airflow
logging:
options:
max-size: 10m
max-file: "3"
networks:
- confluent
spark-master:
image: bitnami/spark:latest
container_name: spark-master
hostname: spark-master
command: bin/spark-class org.apache.spark.deploy.master.Master
ports:
- "9090:8080"
- "7077:7077"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/"]
interval: 30s
timeout: 10s
retries: 5
networks:
- confluent
spark-worker:
image: bitnami/spark:latest
container_name: spark-worker
hostname: spark-worker
ports:
- "8082:8081"
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
depends_on:
- spark-master
environment:
SPARK_MODE: worker
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 1g
SPARK_MASTER_URL: spark://spark-master:7077
healthcheck:
test: ["CMD", "curl", "-f", "http://172.27.0.7:8082"]
interval: 10s
timeout: 3s
retries: 3
networks:
- confluent
cassandra_db:
image: cassandra:latest
container_name: cassandra
hostname: cassandra
ports:
- "9042:9042"
env_file:
- cassandra.env
environment:
- MAX_HEAP_SIZE=512M
- HEAP_NEWSIZE=100M
volumes:
- ./:/home
networks:
- confluent
networks:
confluent:
opened Pull Request #2864 on apache/cassandra
#2864 Allow unhandled operators on SAI indexed columns
Hi, I apologize if this is common knowledge, but I have done two (obviously outdated) courses on Cassandra recently that both reference the DataStax Community Edition with OpsCenter. I can't seem to track it down. I don't even usually use...
Hi, I apologize if this is common knowledge, but I have done two (obviously outdated) courses on Cassandra recently that both reference the DataStax Community Edition with OpsCenter. I can't seem to track it down. I don't even usually use windows when programming, but it's referenced a lot in the courses I've taken and the instructors speak very highly of opscenter. Is this something that just doesn't exist anymore?
commented Pull Request #170 on apache/cassandra-website
I don't have a preference where this documentation lives. I'd just like it to be updated with information on the recent changes in how builds work.
That would be an argument to have it versioned :-)
...I don't have a preference where this documentation lives. I'd just like it to be updated with information on the recent changes in how builds work.
That would be an argument to have it versioned :-)
No strong opinions here, the trade-offs I see are - versioned: simpler to read, just go to the version you are working with and read half as much content. Less error-prone, and an easier page to maintain. - unversioned: one place to learn how it works in all branches. more to read, more cognitive load. if working with multiple branches (e.g. forward merging) you have to do this anyway.
I question I have is if it's unversioned then what's happens to the docs on dependency version for branches that are no longer maintained. The code is not deleted but are we then deleting the documentation to it ? This isn't just archival purposes, we do still (in exception circumstances) apply CVE fixes to past branches (and the older branches are often those even the experienced devs need to refresh their memories on how things worked…)
opened Pull Request #2863 on apache/cassandra
#2863 [CASSANDRA-18968] StartupClusterConnectivityChecker fails on upgrade from 3.X
opened Pull Request #2862 on apache/cassandra
#2862 StartupClusterConnectivityChecker fails on upgrade from 3.X
commented Pull Request #170 on apache/cassandra-website
and… of curiosity… given the versioning differences in the doc, wouldn't this be better moved in-tree ? (easier to read, easier to maintain)
Agree with Ekaterina - I don't have a preference where this...
and… of curiosity… given the versioning differences in the doc, wouldn't this be better moved in-tree ? (easier to read, easier to maintain)
Agree with Ekaterina - I don't have a preference where this documentation lives. I'd just like it to be updated with information on the recent changes in how builds work.
opened Pull Request #2861 on apache/cassandra
#2861 CASSANDRA-18956 Fix incorrect column identifier bytes problem when renaming a column
Replied to a message in 🌐Planet Cassandra in #🌐cassandra-chat
Check out the docs on Monitoring. There's a table-based metric called "TombstoneScannedHistogram" which should report on tombstones that have been returned from a query....
Check out the docs on Monitoring. There's a table-based metric called "TombstoneScannedHistogram" which should report on tombstones that have been returned from a query. https://cassandra.apache.org/doc/3.11/cassandra/operating/metrics.html#table-metrics
opened Pull Request #2860 on apache/cassandra
#2860 CASSANDRA-18996 Add documentation about crypto providers
opened Pull Request #2859 on apache/cassandra
#2859 CASSANDRA-18995 Improve documentation for snitches