【发布时间】:2015-05-17 19:48:52
【问题描述】:
我编写了一个线程安全类来获取来自多个线程的输入,并在它运行到固定大小后将结果上传到 S3。
S3Exporter 类
// this class is thread safe.
public class S3Exporter {
private static final int BUFFER_PADDING = 1000;
private final int targetSize;
private final ByteArrayOutputStream buf;
private volatile boolean started;
public S3Exporter(final int targetSize) {
buf = new ByteArrayOutputStream(targetSize + BUFFER_PADDING);
this.targetSize = targetSize;
started = false;
}
public synchronized void start() {
started = true;
}
public synchronized void end() {
started = false;
flush();
}
public synchronized void export(byte[] data) throws IOException {
Preconditions.checkState(started, "Not started!");
buf.write(b, buf.size(), b.length);
flushIfNeeded();
}
private void flushIfNeeded() {
if (buf.size() >= targetSize) {
flush();
}
}
public synchronized void flush() {
if (buf.size() > 0) {
// upload buf to s3, it's a time-consuming operation
buf.reset();
}
}
}
客户端调用导出方法来传递数据,如果抛出异常,客户端将稍后传递该数据。 为了避免在重新启动应用程序时丢失数据,我在创建 S3Exporter 对象时添加了一个关闭挂钩:
S3Exporter exporter = new S3Exporter(10000);
Runtime.getRuntime().addShutdownHook(new Thread(() -> exporter.end()));
我担心这个类是不可扩展的,我的意思是当数据越来越多时它可能成为系统的瓶颈。我可以想出两种方法来改善这种情况:
- 异步执行耗时的上传操作:使用执行器上传并在shutdown hook中调用ThreadPoolExecutor.awaitTermination()。
- 只需将数据放入导出方法中的 LinkedBlockingQueue 并使用多个线程来处理它。(根据我的理解,这种方式比第一种方式更具可扩展性)
然后我需要在关闭挂钩线程中做更多的工作,以确保不会丢失接受的数据,据我所知,这不是一个好主意。重新启动应用程序时我会冒丢失数据的风险,这是我最不想看到的。
我的问题
- 我对可扩展性的担忧真的是个问题吗?(为了让这个问题不那么愚蠢,假设数据大小是几个字节,调用导出方法的 TPS 是 500)
- 如果第一个问题的答案是肯定的,那么我的改进呢,对吗?如何进行清理工作以避免丢失数据?
【问题讨论】:
标签: java multithreading