【问题标题】:Cassandra throw NoHostAvailableExceptionCassandra 抛出 NoHostAvailableException
【发布时间】:2015-05-18 15:58:06
【问题描述】:

我正在使用以下代码将我的 .net 客户端(基于 CQL)连接到 3 节点 Cassandra 集群。我正在以 30 条记录/秒的速度获取数据(来自 RabbitMQ),它们顺利地存储在 cassandra 中多达 800-900 行。但在那之后,我得到了以下异常。谁能告诉我可以进行哪些优化/更改以避免此异常。我在任何地方都找不到这个问题的具体解决方案。

错误: ERROR ErrorLog - error in Cassandra GetCWCRow Function Connection :None of the hosts tried for query are available (tried: X.X.X.201:9042, X.X.X.200:9042, X.X.X.X:9042)

代码:

using Cassandra;
using Consumer;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Data;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;



namespace RabbitMqCarWaleUserTracking
{
    class DataAccessCassandra
    {

        public bool InsertCookieLogData(string cwc, string page_uri)
        {
            try
            {
                Logs.WriteInfoLog("Cassandra InsertCookieLogData a Function called");
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
                string pageCategory = string.Empty;
                try
                {

                    if ((Regex.IsMatch(page_uri, "/newcars/upcomingcars", RegexOptions.IgnoreCase)))
                    {
                        pageCategory = "upcomingCars";
                    }
                    else
                        if ((Regex.IsMatch(page_uri, "/newcars/dealers/newCarDealerShowroom", RegexOptions.IgnoreCase)) || (Regex.IsMatch(page_uri, "/newcars/dealers/listnewcardealersbycity", RegexOptions.IgnoreCase)
                            || (Regex.IsMatch(page_uri, "/newcars/dealers/dealerdetails", RegexOptions.IgnoreCase))))
                        {
                            pageCategory = "newcarsDealers";
                        }
                        else
                            if ((Regex.IsMatch(page_uri, "/offers", RegexOptions.IgnoreCase)) || (Regex.IsMatch(page_uri, "/alloffers", RegexOptions.IgnoreCase)))
                            {
                                pageCategory = "offers";
                            }
                            else
                                if ((Regex.IsMatch(page_uri, "/dealer/testdrive", RegexOptions.IgnoreCase)))
                                {
                                    pageCategory = "dealerTestDrive";
                                }

                    if (pageCategory != string.Empty)
                    {
                        Row result = session.Execute("select logdate from pageWiseCookieLog where cwc ='" + cwc + "' and page_uri ='" + pageCategory + "' and logdate= '" + DateTime.Today.ToString("yyyy-MM-dd") + "'").FirstOrDefault();
                        if (result == null)
                        {
                            session.Execute("insert into pageWiseCookieLog (cwc, page_uri, logdate) values ('" + cwc + "' , '" + pageCategory + "' , '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
                            session.Execute("insert into pageWiseCookieLogByld (cwc, page_uri, logdate) values ('" + cwc + "' , '" + pageCategory + "' , '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
                            session.Dispose();
                            cluster.Dispose();
                            return true;
                        }
                    }
                    else
                    {
                        //don't want to store the data for rest of the page category but need to return true  
                        session.Dispose();
                        cluster.Dispose();
                        return true;
                    }
                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra InsertCookieLogData function with cwc :" + cwc + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                }
            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra InsertCookieLogData function connection :" + ex.Message);
                SendMail.HandleException(ex, subject);              
            }

            return false;
        }

        public string GetCWCRow(string cwc, int index, string mobileId)
        {
            try
            {
                Logs.WriteInfoLog("Cassandra GetCWCRow Function called");
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
                try
                {
                    Row result = session.Execute("select cur_visit_id from usertracking where cwc ='" + cwc + "'").FirstOrDefault();

                    if (result != null)
                    {
                        session.Dispose();
                        cluster.Dispose();
                        return result[0].ToString();
                    }

                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra GetCWCRow function with cwc :" + cwc + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                }

            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra GetCWCRow Function Connection :" + ex.Message);
                SendMail.HandleException(ex, subject);
            }

            return string.Empty;
        }

        public bool InsertCWCRecords(string cwv, Cut_Case caseType, int index, string mobileId)
        {
            try
            {
                Logs.WriteInfoLog("Cassandra InsertCWCRecords function called for case:" + caseType);
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect(ConfigurationManager.AppSettings["cassandraKeySpace"].ToString());
                try
                {

                    bool _isProcessed = false;
                    string visitCount = "";
                    string[] leadParameters = cwv.Split('.');
                    string cwc = leadParameters[0];
                    string visitId = leadParameters[1];
                    string visitStartTime = leadParameters[2];
                    string visitPrevPageTime = leadParameters[3];
                    string visitLastPageTime = leadParameters[4];

                    if (leadParameters.Length == 6)
                    {
                        visitCount = leadParameters[5];
                    }
                    string TOT_TIME_SPENT = (Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime)).ToString();

                    if ((int)caseType == 1) //to enter new cwc data in summary table
                    {
                        session.Execute("insert into usertracking (cwc, cur_visit_id, cur_visit_last_ts,  tot_page_view, tot_time_spent, tot_visit_count, cur_visit_datetime) values ('" + cwc + "' , '" + visitId + "' ," + visitStartTime + "," + "1" + "," + TOT_TIME_SPENT + "," + "1" + ", '" + DateTime.Today.ToString("yyyy-MM-dd") + "' )");
                        _isProcessed = true;
                    }
                    if ((int)caseType == 2) //if cwc exits and visit id is same
                    {
                        Row result = session.Execute("select tot_page_view, tot_time_spent from usertracking where cwc ='" + cwc + "'").FirstOrDefault();
                        int page_cnt_val = int.Parse(result[0].ToString()) + 1;
                        Int64 time_spt_val = Int64.Parse(result[1].ToString()) + Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime);
                        session.Execute("update usertracking SET cur_visit_last_ts = " + visitLastPageTime + ", tot_page_view = " + page_cnt_val + ", tot_time_spent = " + time_spt_val + " WHERE cwc = '" + cwc.Trim() + "'");
                        _isProcessed = true;
                    }
                    if ((int)caseType == 3) //if cwc exits ans visit id is different
                    {
                        Row result = session.Execute("select tot_page_view, tot_time_spent, tot_visit_count, cur_visit_last_ts, cur_visit_datetime from usertracking where cwc = '" + cwc + "'").First();
                        int page_cnt_val = int.Parse(result[0].ToString()) + 1;
                        Int64 time_spt_val = Int64.Parse(result[1].ToString()) + Convert.ToInt64(visitLastPageTime) - Convert.ToInt64(visitPrevPageTime);
                        int visit_val = int.Parse(result[2].ToString()) + 1;
                        Int64 prev_visit_ts_val = Int64.Parse(result[3].ToString());
                        String prev_visit_datetime_val = Convert.ToDateTime(result[4].ToString()).ToString("yyyy-MM-dd");

                        session.Execute("update usertracking SET  cur_visit_id = '" + visitId + "' , tot_visit_count= " + visit_val
                            + " , prev_visit_last_ts= " + prev_visit_ts_val + ", prev_visit_datetime = '" + prev_visit_datetime_val
                            + "' , cur_visit_last_ts = " + visitLastPageTime
                            + ", tot_page_view = " + page_cnt_val + ", tot_time_spent = " + time_spt_val
                            + ", cur_visit_datetime='" + DateTime.Today.ToString("yyyy-MM-dd")
                            + "' WHERE cwc = '" + cwc.Trim() + "'");
                        _isProcessed = true;
                    }
                    session.Dispose();
                    cluster.Dispose();
                    return _isProcessed;
                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra InsertCWCRecords function with cwv :" + cwv + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                    return false;
                }
            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra InsertCWCRecords function connection :" + ex.Message);
                SendMail.HandleException(ex, subject);
                return false;
            }

        }

        public bool UpdateReferrerTimeSpent(string cwc, int referrerCategoryId, double referrerTimeSpent, int index, string mobileId)
        {
            bool _isUpdated = false;

            try
            {              
                Logs.WriteInfoLog("Cassandra UpdateReferrerTimeSpent function called");
                Cluster cluster = Cluster.Builder().AddContactPoints(ConfigurationManager.AppSettings["cassandraCluster"].ToString().Split(',')).Build();
                ISession session = cluster.Connect("cw");

                try
                {
                    Row result = session.Execute("select time_spent_in_sec from userTimeSpentPage WHERE cwc = '" + cwc.Trim() + "' And logdate = '" + DateTime.Today.ToString("yyyy-MM-dd") + "' And page_category_id =" + referrerCategoryId).FirstOrDefault();
                    if (result != null)
                    {
                        if (result[0].ToString().Trim() != string.Empty)
                        {

                            Int64 page_time_spent_val = Int64.Parse(result[0].ToString());
                            Int64 tot_time_spt_val = page_time_spent_val + Int64.Parse(referrerTimeSpent.ToString());
                            session.Execute("update userTimeSpentPage set time_spent_in_sec= " + tot_time_spt_val + "WHERE cwc = '" + cwc.Trim() + "' And logdate = '" + DateTime.Today.ToString("yyyy-MM-dd") + "' And page_category_id=" + referrerCategoryId);
                        }
                    }
                    else
                    {
                        session.Execute("insert into userTimeSpentPage (cwc, page_category_id, time_spent_in_sec, logdate) values ('" + cwc + "' ," + referrerCategoryId + "," + referrerTimeSpent + ", '" + DateTime.Now.ToString("yyyy-MM-dd") + "' )");
                    }
                    _isUpdated = true;
                    session.Dispose();
                    cluster.Dispose();
                    return _isUpdated;
                }
                catch (Exception ex)
                {
                    string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                    Logs.WriteErrorLog("error in Cassandra UpdateReferrerTimeSpent function with cwc:" + cwc + "error is :" + ex.Message);
                    SendMail.HandleException(ex, subject);
                    session.Dispose();
                    cluster.Dispose();
                    return _isUpdated;
                }
            }
            catch (Exception ex)
            {
                string subject = string.Concat(ex.Source, " : ", Environment.MachineName);
                Logs.WriteErrorLog("error in Cassandra UpdateReferrerTimeSpent function connection" + ex.Message);
                SendMail.HandleException(ex, subject);
                return _isUpdated;
            }

        }
    }
}

已编辑问题:

output of netstat -an | awk '/^tcp/ {print $NF}' | sort | uniq -c | sort -rn

On Machine 1  While cassandra running :
      773 ESTABLISHED
      36 LISTEN
      1 CLOSE_WAIT

After cassandra stopped :
      274 ESTABLISHED
      36 LISTEN
      1 CLOSE_WAIT

Machine 2 while cassandra running :
       3941 ESTABLISHED
       26 LISTEN
       7 CLOSE_WAIT

After cassandra stopped :
       26 LISTEN
       9 ESTABLISHED

On machine 3 while cassandra running :
       500 ESTABLISHED
       21 LISTEN

After cassandra stopped :    
      21 LISTEN
      13 ESTABLISHED

【问题讨论】:

    标签: cassandra cassandra-2.0


    【解决方案1】:

    NoHostAvailableException 可能出于多种原因引发。然而,这都是关于driver documentation中描述的问题:

    由于没有主机而无法执行查询时引发的异常 可用的。如果

    抛出此异常
    • 要么没有主机住 查询时的集群
    • 已尝试的所有主机 由于连接问题而失败

    现在为什么会发生这种情况 - 这可能有几个原因。

    1. 在所有 3 个节点上同时进行主要垃圾收集的可能性。我个人认为情况并非如此,但您绝对应该阅读此内容,看看它是否适用于您的情况。这是一个非常好的文档的链接,描述了如何tune GC in Cassandra。实际上,您为任何调用创建集群和会话对象,而不是将它们存储为单例并只是重复使用它们,这一事实可能会使事情变得更糟。
    2. 在查看了引发错误的函数后,我或多或少地确信问题在于,在插入这么多次之后,您的节点在此语句期间读取宽行时会超时:Row result = session.Execute("select cur_visit_id from usertracking where cwc ='" + cwc + "'").FirstOrDefault();。再往前走,您将对同一集群键 cws 执行大量更新,由于 SSTables 的不可变特性,这会产生大量版本化数据,从而增加数据检索时间,因为集群需要组合您的每个请求的所有这些数据。

    没有任何表模式很难提出建议,逆向工程也无济于事,但我建议以某种方式使用复合主键来加快查找速度。调整您的 JVM,制作会话和集群单例并在您的代码中重用它们。看看这是否有帮助。

    通读 Cassandra 集群日志,重点关注问题发生的时间。查看这些日志中是否有任何线索,例如垃圾收集活动或超时错误。

    HTH

    罗马

    【讨论】:

    • 首先非常感谢您的帮助。我实现了您将集群和会话对象存储为单例对象的建议。这对我来说非常好。我还有一个问题,我已经编辑了我原来的问题。在这些单例更改之前,即使在正确处理集群和会话对象之后,我也有很多 tcp 连接。现在我几乎没有 3-4 个 tcp(节点 1 上的 2 个 tcp 连接,其余节点上的其余节点)连接整个集群。您能否详细说明为什么连接没有关闭?
    • 如果假设这些连接是连接池的结果,我不会太远
    • 嗨 Roman,tune GC in cassandra 的链接失效了,你能提供其他相同的链接吗?
    猜你喜欢
    • 2019-02-25
    • 1970-01-01
    • 2015-02-23
    • 2018-01-18
    • 2021-02-07
    • 2015-04-16
    • 1970-01-01
    • 1970-01-01
    • 2019-11-19
    相关资源
    最近更新 更多