您可能想看看 Azure 中的“诊断”功能是如何工作的。不是因为你会为你正在做的事情使用诊断,而是因为它正在处理类似的问题并且可能会提供一些灵感。我即将实现一个数据审计功能,我想将其记录到表存储中,因此也想延迟更新并将更新集中在一起,我从诊断中获得了很多灵感。
现在,Azure 中的诊断工作方式是每个角色启动一个小的后台“传输”线程。因此,每当您编写任何跟踪时,这些跟踪都会存储在本地内存中的列表中,并且后台线程(默认情况下)将所有请求集中起来,并每分钟将它们传输到表存储中。
在您的场景中,我会让每个角色实例跟踪点击次数,然后使用后台线程每分钟左右更新一次数据库。
我可能会在每个 webrole 上使用静态 ConcurrentDictionary (或悬挂单例)之类的东西,每次点击都会增加页面标识符的计数器。您需要一些线程处理代码来允许多个请求更新列表中的相同计数器。或者,只允许每个“命中”将新记录添加到共享线程安全列表中。
然后,每分钟有一个后台线程使用自上次以来每页的点击次数增加数据库,并将本地计数器重置为 0 或清空共享列表,如果您使用该方法(同样,请注意多线程和锁定)。
重要的是确保您的数据库更新是原子的;如果您从数据库中读取当前计数,将其递增然后将其写回,那么您可能有两个不同的 Web 角色实例同时执行此操作,从而丢失一个更新。
编辑:
以下是您如何 进行此操作的快速示例。
using System.Collections.Concurrent;
using System.Data.SqlClient;
using System.Threading;
using System;
using System.Collections.Generic;
using System.Linq;
class Program
{
static void Main(string[] args)
{
// You would put this in your Application_start for the web role
Thread hitTransfer = new Thread(() => HitCounter.Run(new TimeSpan(0, 0, 1))); // You'd probably want the transfer to happen once a minute rather than once a second
hitTransfer.Start();
//Testing code - this just simulates various web threads being hit and adding hits to the counter
RunTestWorkerThreads(5);
Thread.Sleep(5000);
// You would put the following line in your Application shutdown
HitCounter.StopRunning(); // You could do some cleverer stuff with aborting threads, joining the thread etc but you probably won't need to
Console.WriteLine("Finished...");
Console.ReadKey();
}
private static void RunTestWorkerThreads(int workerCount)
{
Thread[] workerThreads = new Thread[workerCount];
for (int i = 0; i < workerCount; i++)
{
workerThreads[i] = new Thread(
(tagname) =>
{
Random rnd = new Random();
for (int j = 0; j < 300; j++)
{
HitCounter.LogHit(tagname.ToString());
Thread.Sleep(rnd.Next(0, 5));
}
});
workerThreads[i].Start("TAG" + i);
}
foreach (var t in workerThreads)
{
t.Join();
}
Console.WriteLine("All threads finished...");
}
}
public static class HitCounter
{
private static System.Collections.Concurrent.ConcurrentQueue<string> hits;
private static object transferlock = new object();
private static volatile bool stopRunning = false;
static HitCounter()
{
hits = new ConcurrentQueue<string>();
}
public static void LogHit(string tag)
{
hits.Enqueue(tag);
}
public static void Run(TimeSpan transferInterval)
{
while (!stopRunning)
{
Transfer();
Thread.Sleep(transferInterval);
}
}
public static void StopRunning()
{
stopRunning = true;
Transfer();
}
private static void Transfer()
{
lock(transferlock)
{
var tags = GetPendingTags();
var hitCounts = from tag in tags
group tag by tag
into g
select new KeyValuePair<string, int>(g.Key, g.Count());
WriteHits(hitCounts);
}
}
private static void WriteHits(IEnumerable<KeyValuePair<string, int>> hitCounts)
{
// NOTE: I don't usually use sql commands directly and have not tested the below
// The idea is that the update should be atomic so even though you have multiple
// web servers all issuing similar update commands, potentially at the same time,
// they should all commit. I do urge you to test this part as I cannot promise this code
// will work as-is
//using (SqlConnection con = new SqlConnection("xyz"))
//{
// foreach (var hitCount in hitCounts.OrderBy(h => h.Key))
// {
// var cmd = con.CreateCommand();
// cmd.CommandText = "update hits set count = count + @count where tag = @tag";
// cmd.Parameters.AddWithValue("@count", hitCount.Value);
// cmd.Parameters.AddWithValue("@tag", hitCount.Key);
// cmd.ExecuteNonQuery();
// }
//}
Console.WriteLine("Writing....");
foreach (var hitCount in hitCounts.OrderBy(h => h.Key))
{
Console.WriteLine(String.Format("{0}\t{1}", hitCount.Key, hitCount.Value));
}
}
private static IEnumerable<string> GetPendingTags()
{
List<string> hitlist = new List<string>();
var currentCount = hits.Count();
for (int i = 0; i < currentCount; i++)
{
string tag = null;
if (hits.TryDequeue(out tag))
{
hitlist.Add(tag);
}
}
return hitlist;
}
}