温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎么使用StampLock

发布时间:2021-12-24 10:32:44 来源:亿速云 阅读:147 作者:iii 栏目:大数据

本篇内容介绍了“怎么使用StampLock”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

主要成员变量

public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage {

    // 实际存储数据的位置
    private final EntryLogger entryLogger;

    // -----------------
    // index 相关
    // -----------------
 
    // 记录fence,exist,masterKey等信息
    private final LedgerMetadataIndex ledgerIndex;
    // 关于位置的index
    private final EntryLocationIndex entryLocationIndex;
  
    // 临时的ledgerCache
    private final ConcurrentLongHashMap<TransientLedgerInfo> transientLedgerInfoCache;
  
    // -----------------
    // 写入相关
    // -----------------
  
  
    // 用来写入的memtable,2个互相swap
    private final StampedLock writeCacheRotationLock = new StampedLock();
    // Write cache where all new entries are inserted into
    protected volatile WriteCache writeCache;
    // Write cache that is used to swap with writeCache during flushes
    protected volatile WriteCache writeCacheBeingFlushed;
  

    // Cache where we insert entries for speculative reading
    private final ReadCache readCache;

  
    // checkpoint 相关
    private final CheckpointSource checkpointSource;
    private Checkpoint lastCheckpoint = Checkpoint.MIN;
}
主要作用
  1. 可以读写ledger,维护ledger的位置(index)

  2. 保存ledger相关的metadata

  3. 支持checkpoint

写入Entry

写入会直接写入到WriteCache里面,这里面使用了StampLock,将swap cache的操作进行了保护,StampLock是一个乐观读的读写锁,并发更高。

public long addEntry(ByteBuf entry) throws IOException, BookieException {
        long startTime = MathUtils.nowInNano();

        long ledgerId = entry.getLong(entry.readerIndex());
        long entryId = entry.getLong(entry.readerIndex() + 8);
        long lac = entry.getLong(entry.readerIndex() + 16);

        // 这里的模板是StampLock乐观读取的通用模板
        // 相对的互斥操作实际上是swap cache的操作
  
        // First we try to do an optimistic locking to get access to the current write cache.
        // This is based on the fact that the write cache is only being rotated (swapped) every 1 minute. During the
        // rest of the time, we can have multiple thread using the optimistic lock here without interfering.
       
        // 乐观读锁
        long stamp = writeCacheRotationLock.tryOptimisticRead();
        boolean inserted = false;

        inserted = writeCache.put(ledgerId, entryId, entry);
        // 如果插入过程中发生了cache swap 则再次插入
        if (!writeCacheRotationLock.validate(stamp)) {
            // The write cache was rotated while we were inserting. We need to acquire the proper read lock and repeat
            // the operation because we might have inserted in a write cache that was already being flushed and cleared,
            // without being sure about this last entry being flushed or not.
          
            // 说明插入到被swap的那个cache里面了
            // 如果insert是true TODO
            // 如果是false的话没啥影响
            stamp = writeCacheRotationLock.readLock();
            try {
                inserted = writeCache.put(ledgerId, entryId, entry);
            } finally {
                writeCacheRotationLock.unlockRead(stamp);
            }
        }

        // 如果这里写入到writeCache失败的话,触发Flush WriteCache
        // 走到这里说明可能2个buffer都满了?
        if (!inserted) {
            triggerFlushAndAddEntry(ledgerId, entryId, entry);
        }

        // 更新LAC的缓存
        // after successfully insert the entry, update LAC and notify the watchers
        updateCachedLacIfNeeded(ledgerId, lac);
  
        return entryId;
}
writeCache满了,触发flush的流程

这里的逻辑比较容易,一直不断循环插入到writeCache 里面,如果超时的话就跳出循环标记,这个写入失败。

如果没有触发flush动作的话,会提交一个flush task。

private void triggerFlushAndAddEntry(long ledgerId, long entryId, ByteBuf entry)
            throws IOException, BookieException {
        // metric 打点
        dbLedgerStorageStats.getThrottledWriteRequests().inc();
        ...
        // 最大等待写入时间,超时之前不断重试
        while (System.nanoTime() < absoluteTimeoutNanos) {
            // Write cache is full, we need to trigger a flush so that it gets rotated
            // If the flush has already been triggered or flush has already switched the
            // cache, we don't need to trigger another flush
          
            // 提交一个flush任务,如果之前有了就不提交了
            if (!isFlushOngoing.get() && hasFlushBeenTriggered.compareAndSet(false, true)) {
                // Trigger an early flush in background
                log.info("Write cache is full, triggering flush");
                executor.execute(() -> {
                        try {
                            flush();
                        } catch (IOException e) {
                            log.error("Error during flush", e);
                        }
                    });
            }

            long stamp = writeCacheRotationLock.readLock();
            try {
                if (writeCache.put(ledgerId, entryId, entry)) {
                    // We succeeded in putting the entry in write cache in the
                    return;
                }
            } finally {
                writeCacheRotationLock.unlockRead(stamp);
            }

            // Wait some time and try again
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted when adding entry " + ledgerId + "@" + entryId);
            }
        }

        // Timeout expired and we weren't able to insert in write cache
        dbLedgerStorageStats.getRejectedWriteRequests().inc();
        throw new OperationRejectedException();
}
flush 流程

实际上flush流程是触发checkpoint的逻辑,

主要动作

  • 交换2个writeCache,正在写入的cache会被交换成flush的batch

  • 遍历writeCache,将内容写到EntryLogger里面

  • sync EntryLogger将上一步写入的内容落盘

  • 更新ledgerLocationIndex,同时flush这个index到rocksDb里面

public void flush() throws IOException {
    // journal
    Checkpoint cp = checkpointSource.newCheckpoint();
    checkpoint(cp);
    checkpointSource.checkpointComplete(cp, true);
}

public void checkpoint(Checkpoint checkpoint) throws IOException {
        // journal
        Checkpoint thisCheckpoint = checkpointSource.newCheckpoint();
  
        // 这里检查是否在这个点之前做过checkpoint了
        if (lastCheckpoint.compareTo(checkpoint) > 0) {
            return;
        }

        long startTime = MathUtils.nowInNano();

        // Only a single flush operation can happen at a time
        flushMutex.lock();

        try {
            // Swap the write cache so that writes can continue to happen while the flush is
            // ongoing
            // 这里逻辑比较容易,交换当前的writeCache和后备的writeCache
            // 获取的是StampLock的writeLock
            swapWriteCache();

            long sizeToFlush = writeCacheBeingFlushed.size();
            

            // Write all the pending entries into the entry logger and collect the offset
            // position for each entry
            
            // 刷cache到实际的保存位置上、
          
            // 构建一个rocksDb的batch
            Batch batch = entryLocationIndex.newBatch();
            writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
                try {
                    // 把写入的entry刷到entryLogger里面
                    // 这里返回的这个entry的offset
                    long location = entryLogger.addEntry(ledgerId, entry, true);
                  
                    // 这里的逻辑实际上就是把3个long 拆分成k/v 写入到RocksDb的batch 里面
                    entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });

            // 这里不展开说了,实际上会把刚才写入的entryLogger进行flush && fsync 到磁盘上。
            entryLogger.flush();

            // 这里触发RocksDb的batch flush
            // 这个写入是sync的
            long batchFlushStarTime = System.nanoTime();
            batch.flush();
            batch.close();
            
          
            // flush ledgerIndex
            // 这里的内容变化比较少,因为记录的是metadata
            ledgerIndex.flush();

            // 调度一个cleanUp的逻辑
            cleanupExecutor.execute(() -> {
                // There can only be one single cleanup task running because the cleanupExecutor
                // is single-threaded
                try {
                    if (log.isDebugEnabled()) {
                        log.debug("Removing deleted ledgers from db indexes");
                    }

                    entryLocationIndex.removeOffsetFromDeletedLedgers();
                    ledgerIndex.removeDeletedLedgers();
                } catch (Throwable t) {
                    log.warn("Failed to cleanup db indexes", t);
                }
            });
            
            // 保存checkpoint 
            lastCheckpoint = thisCheckpoint;

            // 清空这个cache
          
            // Discard all the entry from the write cache, since they're now persisted
            writeCacheBeingFlushed.clear();

            
        } catch (IOException e) {
            // Leave IOExecption as it is
            throw e;
        } catch (RuntimeException e) {
            // Wrap unchecked exceptions
            throw new IOException(e);
        } finally {
            try {
                isFlushOngoing.set(false);
            } finally {
                flushMutex.unlock();
            }
        }
}

这样写入就完成了

读取Entry

这里会从3个位置开始读取

  1. writeCache,包括正在刷新的和正在写入的

  2. readCache,预读的缓存

  3. entryLogger,读文件,这部分已经落盘了

读取成功之后会尝试增加预读的buffer


如果正在flush这个时候有触发读取会怎么样?

上面的flush流程是在所有内容已经落盘之后才把刷新的writeCache 清空的

即使有并发读,如果最后还是落到了读文件这一步,那怎么都能读到


还有个问题就是这个先后顺序,不确定是否有相同ledgerId,entry,但是内容不同的请求出现。

这样的话感觉可能有问题

public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
        long startTime = MathUtils.nowInNano();
        
        // 读LAC的情况
        if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
            return getLastEntry(ledgerId);
        }

        // We need to try to read from both write caches, since recent entries could be found in either of the two. The
        // write caches are already thread safe on their own, here we just need to make sure we get references to both
        // of them. Using an optimistic lock since the read lock is always free, unless we're swapping the caches.
        long stamp = writeCacheRotationLock.tryOptimisticRead();
        WriteCache localWriteCache = writeCache;
        WriteCache localWriteCacheBeingFlushed = writeCacheBeingFlushed;
        if (!writeCacheRotationLock.validate(stamp)) {
            // Fallback to regular read lock approach
            stamp = writeCacheRotationLock.readLock();
            try {
                localWriteCache = writeCache;
                localWriteCacheBeingFlushed = writeCacheBeingFlushed;
            } finally {
                writeCacheRotationLock.unlockRead(stamp);
            }
        }

        // First try to read from the write cache of recent entries
        ByteBuf entry = localWriteCache.get(ledgerId, entryId);
        if (entry != null) {
            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            return entry;
        }

        // If there's a flush going on, the entry might be in the flush buffer
        entry = localWriteCacheBeingFlushed.get(ledgerId, entryId);
        if (entry != null) {
            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            return entry;
        }

        // Try reading from read-ahead cache
        entry = readCache.get(ledgerId, entryId);
        if (entry != null) {
            recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheHitStats(), startTime);
            recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            return entry;
        }

        // Read from main storage
        long entryLocation;
        try {
            entryLocation = entryLocationIndex.getLocation(ledgerId, entryId);
            if (entryLocation == 0) {
                throw new NoEntryException(ledgerId, entryId);
            }
            entry = entryLogger.readEntry(ledgerId, entryId, entryLocation);
        } catch (NoEntryException e) {
            recordFailedEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
            throw e;
        }

        readCache.put(ledgerId, entryId, entry);

        // Try to read more entries
        long nextEntryLocation = entryLocation + 4 /* size header */ + entry.readableBytes();
        fillReadAheadCache(ledgerId, entryId + 1, nextEntryLocation);

        recordSuccessfulEvent(dbLedgerStorageStats.getReadCacheMissStats(), startTime);
        recordSuccessfulEvent(dbLedgerStorageStats.getReadEntryStats(), startTime);
        return entry;
}

“怎么使用StampLock”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI