Coverage Report

Created: 2025-06-10 13:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/bitcoin/src/leveldb/db/db_impl.cc
Line
Count
Source
1
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2
// Use of this source code is governed by a BSD-style license that can be
3
// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5
#include "db/db_impl.h"
6
7
#include <stdint.h>
8
#include <stdio.h>
9
10
#include <algorithm>
11
#include <atomic>
12
#include <set>
13
#include <string>
14
#include <vector>
15
16
#include "db/builder.h"
17
#include "db/db_iter.h"
18
#include "db/dbformat.h"
19
#include "db/filename.h"
20
#include "db/log_reader.h"
21
#include "db/log_writer.h"
22
#include "db/memtable.h"
23
#include "db/table_cache.h"
24
#include "db/version_set.h"
25
#include "db/write_batch_internal.h"
26
#include "leveldb/db.h"
27
#include "leveldb/env.h"
28
#include "leveldb/status.h"
29
#include "leveldb/table.h"
30
#include "leveldb/table_builder.h"
31
#include "port/port.h"
32
#include "table/block.h"
33
#include "table/merger.h"
34
#include "table/two_level_iterator.h"
35
#include "util/coding.h"
36
#include "util/logging.h"
37
#include "util/mutexlock.h"
38
39
namespace leveldb {
40
41
const int kNumNonTableCacheFiles = 10;
42
43
// Information kept for every waiting writer
44
struct DBImpl::Writer {
45
  explicit Writer(port::Mutex* mu)
46
2.32M
      : batch(nullptr), sync(false), done(false), cv(mu) {}
47
48
  Status status;
49
  WriteBatch* batch;
50
  bool sync;
51
  bool done;
52
  port::CondVar cv;
53
};
54
55
struct DBImpl::CompactionState {
56
  // Files produced by compaction
57
  struct Output {
58
    uint64_t number;
59
    uint64_t file_size;
60
    InternalKey smallest, largest;
61
  };
62
63
0
  Output* current_output() { return &outputs[outputs.size() - 1]; }
64
65
  explicit CompactionState(Compaction* c)
66
0
      : compaction(c),
67
0
        smallest_snapshot(0),
68
0
        outfile(nullptr),
69
0
        builder(nullptr),
70
0
        total_bytes(0) {}
71
72
  Compaction* const compaction;
73
74
  // Sequence numbers < smallest_snapshot are not significant since we
75
  // will never have to service a snapshot below smallest_snapshot.
76
  // Therefore if we have seen a sequence number S <= smallest_snapshot,
77
  // we can drop all entries for the same key with sequence numbers < S.
78
  SequenceNumber smallest_snapshot;
79
80
  std::vector<Output> outputs;
81
82
  // State kept for output being generated
83
  WritableFile* outfile;
84
  TableBuilder* builder;
85
86
  uint64_t total_bytes;
87
};
88
89
// Fix user-supplied options to be reasonable
90
template <class T, class V>
91
133k
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
92
133k
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  Branch (92:7): [True: 0, False: 33.2k]
  Branch (92:7): [True: 0, False: 99.8k]
93
133k
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
  Branch (93:7): [True: 0, False: 33.2k]
  Branch (93:7): [True: 0, False: 99.8k]
94
133k
}
db_impl.cc:void leveldb::ClipToRange<int, int>(int*, int, int)
Line
Count
Source
91
33.2k
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
92
33.2k
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  Branch (92:7): [True: 0, False: 33.2k]
93
33.2k
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
  Branch (93:7): [True: 0, False: 33.2k]
94
33.2k
}
db_impl.cc:void leveldb::ClipToRange<unsigned long, int>(unsigned long*, int, int)
Line
Count
Source
91
99.8k
static void ClipToRange(T* ptr, V minvalue, V maxvalue) {
92
99.8k
  if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue;
  Branch (92:7): [True: 0, False: 99.8k]
93
99.8k
  if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue;
  Branch (93:7): [True: 0, False: 99.8k]
94
99.8k
}
95
Options SanitizeOptions(const std::string& dbname,
96
                        const InternalKeyComparator* icmp,
97
                        const InternalFilterPolicy* ipolicy,
98
33.2k
                        const Options& src) {
99
33.2k
  Options result = src;
100
33.2k
  result.comparator = icmp;
101
33.2k
  result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
  Branch (101:26): [True: 33.2k, False: 0]
102
33.2k
  ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000);
103
33.2k
  ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30);
104
33.2k
  ClipToRange(&result.max_file_size, 1 << 20, 1 << 30);
105
33.2k
  ClipToRange(&result.block_size, 1 << 10, 4 << 20);
106
33.2k
  if (result.info_log == nullptr) {
  Branch (106:7): [True: 0, False: 33.2k]
107
    // Open a log file in the same directory as the db
108
0
    src.env->CreateDir(dbname);  // In case it does not exist
109
0
    src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname));
110
0
    Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
111
0
    if (!s.ok()) {
  Branch (111:9): [True: 0, False: 0]
112
      // No place suitable for logging
113
0
      result.info_log = nullptr;
114
0
    }
115
0
  }
116
33.2k
  if (result.block_cache == nullptr) {
  Branch (116:7): [True: 0, False: 33.2k]
117
0
    result.block_cache = NewLRUCache(8 << 20);
118
0
  }
119
33.2k
  return result;
120
33.2k
}
121
122
33.2k
static int TableCacheSize(const Options& sanitized_options) {
123
  // Reserve ten files or so for other uses and give the rest to TableCache.
124
33.2k
  return sanitized_options.max_open_files - kNumNonTableCacheFiles;
125
33.2k
}
126
127
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
128
33.2k
    : env_(raw_options.env),
129
33.2k
      internal_comparator_(raw_options.comparator),
130
33.2k
      internal_filter_policy_(raw_options.filter_policy),
131
33.2k
      options_(SanitizeOptions(dbname, &internal_comparator_,
132
33.2k
                               &internal_filter_policy_, raw_options)),
133
33.2k
      owns_info_log_(options_.info_log != raw_options.info_log),
134
33.2k
      owns_cache_(options_.block_cache != raw_options.block_cache),
135
33.2k
      dbname_(dbname),
136
33.2k
      table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))),
137
33.2k
      db_lock_(nullptr),
138
33.2k
      shutting_down_(false),
139
33.2k
      background_work_finished_signal_(&mutex_),
140
33.2k
      mem_(nullptr),
141
33.2k
      imm_(nullptr),
142
33.2k
      has_imm_(false),
143
33.2k
      logfile_(nullptr),
144
33.2k
      logfile_number_(0),
145
33.2k
      log_(nullptr),
146
33.2k
      seed_(0),
147
33.2k
      tmp_batch_(new WriteBatch),
148
33.2k
      background_compaction_scheduled_(false),
149
33.2k
      manual_compaction_(nullptr),
150
33.2k
      versions_(new VersionSet(dbname_, &options_, table_cache_,
151
33.2k
                               &internal_comparator_)) {}
152
153
33.2k
DBImpl::~DBImpl() {
154
  // Wait for background work to finish.
155
33.2k
  mutex_.Lock();
156
33.2k
  shutting_down_.store(true, std::memory_order_release);
157
33.2k
  while (background_compaction_scheduled_) {
  Branch (157:10): [True: 0, False: 33.2k]
158
0
    background_work_finished_signal_.Wait();
159
0
  }
160
33.2k
  mutex_.Unlock();
161
162
33.2k
  if (db_lock_ != nullptr) {
  Branch (162:7): [True: 33.2k, False: 0]
163
33.2k
    env_->UnlockFile(db_lock_);
164
33.2k
  }
165
166
33.2k
  delete versions_;
167
33.2k
  if (mem_ != nullptr) mem_->Unref();
  Branch (167:7): [True: 33.2k, False: 0]
168
33.2k
  if (imm_ != nullptr) imm_->Unref();
  Branch (168:7): [True: 0, False: 33.2k]
169
33.2k
  delete tmp_batch_;
170
33.2k
  delete log_;
171
33.2k
  delete logfile_;
172
33.2k
  delete table_cache_;
173
174
33.2k
  if (owns_info_log_) {
  Branch (174:7): [True: 0, False: 33.2k]
175
0
    delete options_.info_log;
176
0
  }
177
33.2k
  if (owns_cache_) {
  Branch (177:7): [True: 0, False: 33.2k]
178
0
    delete options_.block_cache;
179
0
  }
180
33.2k
}
181
182
33.2k
Status DBImpl::NewDB() {
183
33.2k
  VersionEdit new_db;
184
33.2k
  new_db.SetComparatorName(user_comparator()->Name());
185
33.2k
  new_db.SetLogNumber(0);
186
33.2k
  new_db.SetNextFile(2);
187
33.2k
  new_db.SetLastSequence(0);
188
189
33.2k
  const std::string manifest = DescriptorFileName(dbname_, 1);
190
33.2k
  WritableFile* file;
191
33.2k
  Status s = env_->NewWritableFile(manifest, &file);
192
33.2k
  if (!s.ok()) {
  Branch (192:7): [True: 0, False: 33.2k]
193
0
    return s;
194
0
  }
195
33.2k
  {
196
33.2k
    log::Writer log(file);
197
33.2k
    std::string record;
198
33.2k
    new_db.EncodeTo(&record);
199
33.2k
    s = log.AddRecord(record);
200
33.2k
    if (s.ok()) {
  Branch (200:9): [True: 33.2k, False: 0]
201
33.2k
      s = file->Close();
202
33.2k
    }
203
33.2k
  }
204
33.2k
  delete file;
205
33.2k
  if (s.ok()) {
  Branch (205:7): [True: 33.2k, False: 0]
206
    // Make "CURRENT" file that points to the new manifest file.
207
33.2k
    s = SetCurrentFile(env_, dbname_, 1);
208
33.2k
  } else {
209
0
    env_->DeleteFile(manifest);
210
0
  }
211
33.2k
  return s;
212
33.2k
}
213
214
0
void DBImpl::MaybeIgnoreError(Status* s) const {
215
0
  if (s->ok() || options_.paranoid_checks) {
  Branch (215:7): [True: 0, False: 0]
  Branch (215:18): [True: 0, False: 0]
216
    // No change needed
217
0
  } else {
218
0
    Log(options_.info_log, "Ignoring error %s", s->ToString().c_str());
219
0
    *s = Status::OK();
220
0
  }
221
0
}
222
223
33.3k
void DBImpl::DeleteObsoleteFiles() {
224
33.3k
  mutex_.AssertHeld();
225
226
33.3k
  if (!bg_error_.ok()) {
  Branch (226:7): [True: 0, False: 33.3k]
227
    // After a background error, we don't know whether a new version may
228
    // or may not have been committed, so we cannot safely garbage collect.
229
0
    return;
230
0
  }
231
232
  // Make a set of all of the live files
233
33.3k
  std::set<uint64_t> live = pending_outputs_;
234
33.3k
  versions_->AddLiveFiles(&live);
235
236
33.3k
  std::vector<std::string> filenames;
237
33.3k
  env_->GetChildren(dbname_, &filenames);  // Ignoring errors on purpose
238
33.3k
  uint64_t number;
239
33.3k
  FileType type;
240
33.3k
  std::vector<std::string> files_to_delete;
241
233k
  for (std::string& filename : filenames) {
  Branch (241:30): [True: 233k, False: 33.3k]
242
233k
    if (ParseFileName(filename, &number, &type)) {
  Branch (242:9): [True: 166k, False: 66.6k]
243
166k
      bool keep = true;
244
166k
      switch (type) {
  Branch (244:15): [True: 0, False: 166k]
245
33.3k
        case kLogFile:
  Branch (245:9): [True: 33.3k, False: 133k]
246
33.3k
          keep = ((number >= versions_->LogNumber()) ||
  Branch (246:19): [True: 33.3k, False: 24]
247
33.3k
                  (number == versions_->PrevLogNumber()));
  Branch (247:19): [True: 0, False: 24]
248
33.3k
          break;
249
66.5k
        case kDescriptorFile:
  Branch (249:9): [True: 66.5k, False: 99.9k]
250
          // Keep my manifest file, and any newer incarnations'
251
          // (in case there is a race that allows other incarnations)
252
66.5k
          keep = (number >= versions_->ManifestFileNumber());
253
66.5k
          break;
254
24
        case kTableFile:
  Branch (254:9): [True: 24, False: 166k]
255
24
          keep = (live.find(number) != live.end());
256
24
          break;
257
0
        case kTempFile:
  Branch (257:9): [True: 0, False: 166k]
258
          // Any temp files that are currently being written to must
259
          // be recorded in pending_outputs_, which is inserted into "live"
260
0
          keep = (live.find(number) != live.end());
261
0
          break;
262
33.3k
        case kCurrentFile:
  Branch (262:9): [True: 33.3k, False: 133k]
263
66.6k
        case kDBLockFile:
  Branch (263:9): [True: 33.3k, False: 133k]
264
66.6k
        case kInfoLogFile:
  Branch (264:9): [True: 0, False: 166k]
265
66.6k
          keep = true;
266
66.6k
          break;
267
166k
      }
268
269
166k
      if (!keep) {
  Branch (269:11): [True: 33.3k, False: 133k]
270
33.3k
        files_to_delete.push_back(std::move(filename));
271
33.3k
        if (type == kTableFile) {
  Branch (271:13): [True: 0, False: 33.3k]
272
0
          table_cache_->Evict(number);
273
0
        }
274
33.3k
        Log(options_.info_log, "Delete type=%d #%lld\n", static_cast<int>(type),
275
33.3k
            static_cast<unsigned long long>(number));
276
33.3k
      }
277
166k
    }
278
233k
  }
279
280
  // While deleting all files unblock other threads. All files being deleted
281
  // have unique names which will not collide with newly created files and
282
  // are therefore safe to delete while allowing other threads to proceed.
283
33.3k
  mutex_.Unlock();
284
33.3k
  for (const std::string& filename : files_to_delete) {
  Branch (284:36): [True: 33.3k, False: 33.3k]
285
33.3k
    env_->DeleteFile(dbname_ + "/" + filename);
286
33.3k
  }
287
33.3k
  mutex_.Lock();
288
33.3k
}
289
290
33.2k
Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {
291
33.2k
  mutex_.AssertHeld();
292
293
  // Ignore error from CreateDir since the creation of the DB is
294
  // committed only when the descriptor is created, and this directory
295
  // may already exist from a previous failed creation attempt.
296
33.2k
  env_->CreateDir(dbname_);
297
33.2k
  assert(db_lock_ == nullptr);
  Branch (297:3): [True: 33.2k, False: 0]
298
33.2k
  Status s = env_->LockFile(LockFileName(dbname_), &db_lock_);
299
33.2k
  if (!s.ok()) {
  Branch (299:7): [True: 0, False: 33.2k]
300
0
    return s;
301
0
  }
302
303
33.2k
  if (!env_->FileExists(CurrentFileName(dbname_))) {
  Branch (303:7): [True: 33.2k, False: 0]
304
33.2k
    if (options_.create_if_missing) {
  Branch (304:9): [True: 33.2k, False: 0]
305
33.2k
      s = NewDB();
306
33.2k
      if (!s.ok()) {
  Branch (306:11): [True: 0, False: 33.2k]
307
0
        return s;
308
0
      }
309
33.2k
    } else {
310
0
      return Status::InvalidArgument(
311
0
          dbname_, "does not exist (create_if_missing is false)");
312
0
    }
313
33.2k
  } else {
314
0
    if (options_.error_if_exists) {
  Branch (314:9): [True: 0, False: 0]
315
0
      return Status::InvalidArgument(dbname_,
316
0
                                     "exists (error_if_exists is true)");
317
0
    }
318
0
  }
319
320
33.2k
  s = versions_->Recover(save_manifest);
321
33.2k
  if (!s.ok()) {
  Branch (321:7): [True: 0, False: 33.2k]
322
0
    return s;
323
0
  }
324
33.2k
  SequenceNumber max_sequence(0);
325
326
  // Recover from all newer log files than the ones named in the
327
  // descriptor (new log files may have been added by the previous
328
  // incarnation without registering them in the descriptor).
329
  //
330
  // Note that PrevLogNumber() is no longer used, but we pay
331
  // attention to it in case we are recovering a database
332
  // produced by an older version of leveldb.
333
33.2k
  const uint64_t min_log = versions_->LogNumber();
334
33.2k
  const uint64_t prev_log = versions_->PrevLogNumber();
335
33.2k
  std::vector<std::string> filenames;
336
33.2k
  s = env_->GetChildren(dbname_, &filenames);
337
33.2k
  if (!s.ok()) {
  Branch (337:7): [True: 0, False: 33.2k]
338
0
    return s;
339
0
  }
340
33.2k
  std::set<uint64_t> expected;
341
33.2k
  versions_->AddLiveFiles(&expected);
342
33.2k
  uint64_t number;
343
33.2k
  FileType type;
344
33.2k
  std::vector<uint64_t> logs;
345
199k
  for (size_t i = 0; i < filenames.size(); i++) {
  Branch (345:22): [True: 166k, False: 33.2k]
346
166k
    if (ParseFileName(filenames[i], &number, &type)) {
  Branch (346:9): [True: 99.8k, False: 66.5k]
347
99.8k
      expected.erase(number);
348
99.8k
      if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
  Branch (348:11): [True: 0, False: 99.8k]
  Branch (348:32): [True: 0, False: 0]
  Branch (348:55): [True: 0, False: 0]
349
0
        logs.push_back(number);
350
99.8k
    }
351
166k
  }
352
33.2k
  if (!expected.empty()) {
  Branch (352:7): [True: 0, False: 33.2k]
353
0
    char buf[50];
354
0
    snprintf(buf, sizeof(buf), "%d missing files; e.g.",
355
0
             static_cast<int>(expected.size()));
356
0
    return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
357
0
  }
358
359
  // Recover in the order in which the logs were generated
360
33.2k
  std::sort(logs.begin(), logs.end());
361
33.2k
  for (size_t i = 0; i < logs.size(); i++) {
  Branch (361:22): [True: 0, False: 33.2k]
362
0
    s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
363
0
                       &max_sequence);
364
0
    if (!s.ok()) {
  Branch (364:9): [True: 0, False: 0]
365
0
      return s;
366
0
    }
367
368
    // The previous incarnation may not have written any MANIFEST
369
    // records after allocating this log number.  So we manually
370
    // update the file number allocation counter in VersionSet.
371
0
    versions_->MarkFileNumberUsed(logs[i]);
372
0
  }
373
374
33.2k
  if (versions_->LastSequence() < max_sequence) {
  Branch (374:7): [True: 0, False: 33.2k]
375
0
    versions_->SetLastSequence(max_sequence);
376
0
  }
377
378
33.2k
  return Status::OK();
379
33.2k
}
380
381
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
382
                              bool* save_manifest, VersionEdit* edit,
383
0
                              SequenceNumber* max_sequence) {
384
0
  struct LogReporter : public log::Reader::Reporter {
385
0
    Env* env;
386
0
    Logger* info_log;
387
0
    const char* fname;
388
0
    Status* status;  // null if options_.paranoid_checks==false
389
0
    void Corruption(size_t bytes, const Status& s) override {
390
0
      Log(info_log, "%s%s: dropping %d bytes; %s",
391
0
          (this->status == nullptr ? "(ignoring error) " : ""), fname,
  Branch (391:12): [True: 0, False: 0]
392
0
          static_cast<int>(bytes), s.ToString().c_str());
393
0
      if (this->status != nullptr && this->status->ok()) *this->status = s;
  Branch (393:11): [True: 0, False: 0]
  Branch (393:38): [True: 0, False: 0]
394
0
    }
395
0
  };
396
397
0
  mutex_.AssertHeld();
398
399
  // Open the log file
400
0
  std::string fname = LogFileName(dbname_, log_number);
401
0
  SequentialFile* file;
402
0
  Status status = env_->NewSequentialFile(fname, &file);
403
0
  if (!status.ok()) {
  Branch (403:7): [True: 0, False: 0]
404
0
    MaybeIgnoreError(&status);
405
0
    return status;
406
0
  }
407
408
  // Create the log reader.
409
0
  LogReporter reporter;
410
0
  reporter.env = env_;
411
0
  reporter.info_log = options_.info_log;
412
0
  reporter.fname = fname.c_str();
413
0
  reporter.status = (options_.paranoid_checks ? &status : nullptr);
  Branch (413:22): [True: 0, False: 0]
414
  // We intentionally make log::Reader do checksumming even if
415
  // paranoid_checks==false so that corruptions cause entire commits
416
  // to be skipped instead of propagating bad information (like overly
417
  // large sequence numbers).
418
0
  log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/);
419
0
  Log(options_.info_log, "Recovering log #%llu",
420
0
      (unsigned long long)log_number);
421
422
  // Read all the records and add to a memtable
423
0
  std::string scratch;
424
0
  Slice record;
425
0
  WriteBatch batch;
426
0
  int compactions = 0;
427
0
  MemTable* mem = nullptr;
428
0
  while (reader.ReadRecord(&record, &scratch) && status.ok()) {
  Branch (428:10): [True: 0, False: 0]
  Branch (428:50): [True: 0, False: 0]
429
0
    if (record.size() < 12) {
  Branch (429:9): [True: 0, False: 0]
430
0
      reporter.Corruption(record.size(),
431
0
                          Status::Corruption("log record too small", fname));
432
0
      continue;
433
0
    }
434
0
    WriteBatchInternal::SetContents(&batch, record);
435
436
0
    if (mem == nullptr) {
  Branch (436:9): [True: 0, False: 0]
437
0
      mem = new MemTable(internal_comparator_);
438
0
      mem->Ref();
439
0
    }
440
0
    status = WriteBatchInternal::InsertInto(&batch, mem);
441
0
    MaybeIgnoreError(&status);
442
0
    if (!status.ok()) {
  Branch (442:9): [True: 0, False: 0]
443
0
      break;
444
0
    }
445
0
    const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) +
446
0
                                    WriteBatchInternal::Count(&batch) - 1;
447
0
    if (last_seq > *max_sequence) {
  Branch (447:9): [True: 0, False: 0]
448
0
      *max_sequence = last_seq;
449
0
    }
450
451
0
    if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
  Branch (451:9): [True: 0, False: 0]
452
0
      compactions++;
453
0
      *save_manifest = true;
454
0
      status = WriteLevel0Table(mem, edit, nullptr);
455
0
      mem->Unref();
456
0
      mem = nullptr;
457
0
      if (!status.ok()) {
  Branch (457:11): [True: 0, False: 0]
458
        // Reflect errors immediately so that conditions like full
459
        // file-systems cause the DB::Open() to fail.
460
0
        break;
461
0
      }
462
0
    }
463
0
  }
464
465
0
  delete file;
466
467
  // See if we should keep reusing the last log file.
468
0
  if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
  Branch (468:7): [True: 0, False: 0]
  Branch (468:22): [True: 0, False: 0]
  Branch (468:45): [True: 0, False: 0]
  Branch (468:57): [True: 0, False: 0]
469
0
    assert(logfile_ == nullptr);
  Branch (469:5): [True: 0, False: 0]
470
0
    assert(log_ == nullptr);
  Branch (470:5): [True: 0, False: 0]
471
0
    assert(mem_ == nullptr);
  Branch (471:5): [True: 0, False: 0]
472
0
    uint64_t lfile_size;
473
0
    if (env_->GetFileSize(fname, &lfile_size).ok() &&
  Branch (473:9): [True: 0, False: 0]
  Branch (473:9): [True: 0, False: 0]
474
0
        env_->NewAppendableFile(fname, &logfile_).ok()) {
  Branch (474:9): [True: 0, False: 0]
475
0
      Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
476
0
      log_ = new log::Writer(logfile_, lfile_size);
477
0
      logfile_number_ = log_number;
478
0
      if (mem != nullptr) {
  Branch (478:11): [True: 0, False: 0]
479
0
        mem_ = mem;
480
0
        mem = nullptr;
481
0
      } else {
482
        // mem can be nullptr if lognum exists but was empty.
483
0
        mem_ = new MemTable(internal_comparator_);
484
0
        mem_->Ref();
485
0
      }
486
0
    }
487
0
  }
488
489
0
  if (mem != nullptr) {
  Branch (489:7): [True: 0, False: 0]
490
    // mem did not get reused; compact it.
491
0
    if (status.ok()) {
  Branch (491:9): [True: 0, False: 0]
492
0
      *save_manifest = true;
493
0
      status = WriteLevel0Table(mem, edit, nullptr);
494
0
    }
495
0
    mem->Unref();
496
0
  }
497
498
0
  return status;
499
0
}
500
501
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
502
24
                                Version* base) {
503
24
  mutex_.AssertHeld();
504
24
  const uint64_t start_micros = env_->NowMicros();
505
24
  FileMetaData meta;
506
24
  meta.number = versions_->NewFileNumber();
507
24
  pending_outputs_.insert(meta.number);
508
24
  Iterator* iter = mem->NewIterator();
509
24
  Log(options_.info_log, "Level-0 table #%llu: started",
510
24
      (unsigned long long)meta.number);
511
512
24
  Status s;
513
24
  {
514
24
    mutex_.Unlock();
515
24
    s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
516
24
    mutex_.Lock();
517
24
  }
518
519
24
  Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s",
520
24
      (unsigned long long)meta.number, (unsigned long long)meta.file_size,
521
24
      s.ToString().c_str());
522
24
  delete iter;
523
24
  pending_outputs_.erase(meta.number);
524
525
  // Note that if file_size is zero, the file has been deleted and
526
  // should not be added to the manifest.
527
24
  int level = 0;
528
24
  if (s.ok() && meta.file_size > 0) {
  Branch (528:7): [True: 24, False: 0]
  Branch (528:17): [True: 24, False: 0]
529
24
    const Slice min_user_key = meta.smallest.user_key();
530
24
    const Slice max_user_key = meta.largest.user_key();
531
24
    if (base != nullptr) {
  Branch (531:9): [True: 24, False: 0]
532
24
      level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
533
24
    }
534
24
    edit->AddFile(level, meta.number, meta.file_size, meta.smallest,
535
24
                  meta.largest);
536
24
  }
537
538
24
  CompactionStats stats;
539
24
  stats.micros = env_->NowMicros() - start_micros;
540
24
  stats.bytes_written = meta.file_size;
541
24
  stats_[level].Add(stats);
542
24
  return s;
543
24
}
544
545
24
void DBImpl::CompactMemTable() {
546
24
  mutex_.AssertHeld();
547
24
  assert(imm_ != nullptr);
  Branch (547:3): [True: 24, False: 0]
548
549
  // Save the contents of the memtable as a new Table
550
24
  VersionEdit edit;
551
24
  Version* base = versions_->current();
552
24
  base->Ref();
553
24
  Status s = WriteLevel0Table(imm_, &edit, base);
554
24
  base->Unref();
555
556
24
  if (s.ok() && shutting_down_.load(std::memory_order_acquire)) {
  Branch (556:7): [True: 24, False: 0]
  Branch (556:17): [True: 0, False: 24]
557
0
    s = Status::IOError("Deleting DB during memtable compaction");
558
0
  }
559
560
  // Replace immutable memtable with the generated Table
561
24
  if (s.ok()) {
  Branch (561:7): [True: 24, False: 0]
562
24
    edit.SetPrevLogNumber(0);
563
24
    edit.SetLogNumber(logfile_number_);  // Earlier logs no longer needed
564
24
    s = versions_->LogAndApply(&edit, &mutex_);
565
24
  }
566
567
24
  if (s.ok()) {
  Branch (567:7): [True: 24, False: 0]
568
    // Commit to the new state
569
24
    imm_->Unref();
570
24
    imm_ = nullptr;
571
24
    has_imm_.store(false, std::memory_order_release);
572
24
    DeleteObsoleteFiles();
573
24
  } else {
574
0
    RecordBackgroundError(s);
575
0
  }
576
24
}
577
578
0
void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
579
0
  int max_level_with_files = 1;
580
0
  {
581
0
    MutexLock l(&mutex_);
582
0
    Version* base = versions_->current();
583
0
    for (int level = 1; level < config::kNumLevels; level++) {
  Branch (583:25): [True: 0, False: 0]
584
0
      if (base->OverlapInLevel(level, begin, end)) {
  Branch (584:11): [True: 0, False: 0]
585
0
        max_level_with_files = level;
586
0
      }
587
0
    }
588
0
  }
589
0
  TEST_CompactMemTable();  // TODO(sanjay): Skip if memtable does not overlap
590
0
  for (int level = 0; level < max_level_with_files; level++) {
  Branch (590:23): [True: 0, False: 0]
591
0
    TEST_CompactRange(level, begin, end);
592
0
  }
593
0
}
594
595
void DBImpl::TEST_CompactRange(int level, const Slice* begin,
596
0
                               const Slice* end) {
597
0
  assert(level >= 0);
  Branch (597:3): [True: 0, False: 0]
598
0
  assert(level + 1 < config::kNumLevels);
  Branch (598:3): [True: 0, False: 0]
599
600
0
  InternalKey begin_storage, end_storage;
601
602
0
  ManualCompaction manual;
603
0
  manual.level = level;
604
0
  manual.done = false;
605
0
  if (begin == nullptr) {
  Branch (605:7): [True: 0, False: 0]
606
0
    manual.begin = nullptr;
607
0
  } else {
608
0
    begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
609
0
    manual.begin = &begin_storage;
610
0
  }
611
0
  if (end == nullptr) {
  Branch (611:7): [True: 0, False: 0]
612
0
    manual.end = nullptr;
613
0
  } else {
614
0
    end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
615
0
    manual.end = &end_storage;
616
0
  }
617
618
0
  MutexLock l(&mutex_);
619
0
  while (!manual.done && !shutting_down_.load(std::memory_order_acquire) &&
  Branch (619:10): [True: 0, False: 0]
  Branch (619:26): [True: 0, False: 0]
620
0
         bg_error_.ok()) {
  Branch (620:10): [True: 0, False: 0]
621
0
    if (manual_compaction_ == nullptr) {  // Idle
  Branch (621:9): [True: 0, False: 0]
622
0
      manual_compaction_ = &manual;
623
0
      MaybeScheduleCompaction();
624
0
    } else {  // Running either my compaction or another compaction.
625
0
      background_work_finished_signal_.Wait();
626
0
    }
627
0
  }
628
0
  if (manual_compaction_ == &manual) {
  Branch (628:7): [True: 0, False: 0]
629
    // Cancel my manual compaction since we aborted early for some reason.
630
0
    manual_compaction_ = nullptr;
631
0
  }
632
0
}
633
634
0
Status DBImpl::TEST_CompactMemTable() {
635
  // nullptr batch means just wait for earlier writes to be done
636
0
  Status s = Write(WriteOptions(), nullptr);
637
0
  if (s.ok()) {
  Branch (637:7): [True: 0, False: 0]
638
    // Wait until the compaction completes
639
0
    MutexLock l(&mutex_);
640
0
    while (imm_ != nullptr && bg_error_.ok()) {
  Branch (640:12): [True: 0, False: 0]
  Branch (640:31): [True: 0, False: 0]
641
0
      background_work_finished_signal_.Wait();
642
0
    }
643
0
    if (imm_ != nullptr) {
  Branch (643:9): [True: 0, False: 0]
644
0
      s = bg_error_;
645
0
    }
646
0
  }
647
0
  return s;
648
0
}
649
650
0
void DBImpl::RecordBackgroundError(const Status& s) {
651
0
  mutex_.AssertHeld();
652
0
  if (bg_error_.ok()) {
  Branch (652:7): [True: 0, False: 0]
653
0
    bg_error_ = s;
654
0
    background_work_finished_signal_.SignalAll();
655
0
  }
656
0
}
657
658
33.3k
void DBImpl::MaybeScheduleCompaction() {
659
33.3k
  mutex_.AssertHeld();
660
33.3k
  if (background_compaction_scheduled_) {
  Branch (660:7): [True: 0, False: 33.3k]
661
    // Already scheduled
662
33.3k
  } else if (shutting_down_.load(std::memory_order_acquire)) {
  Branch (662:14): [True: 0, False: 33.3k]
663
    // DB is being deleted; no more background compactions
664
33.3k
  } else if (!bg_error_.ok()) {
  Branch (664:14): [True: 0, False: 33.3k]
665
    // Already got an error; no more changes
666
33.3k
  } else if (imm_ == nullptr && manual_compaction_ == nullptr &&
  Branch (666:14): [True: 33.3k, False: 24]
  Branch (666:33): [True: 33.3k, False: 0]
667
33.3k
             !versions_->NeedsCompaction()) {
  Branch (667:14): [True: 33.3k, False: 0]
668
    // No work to be done
669
33.3k
  } else {
670
24
    background_compaction_scheduled_ = true;
671
24
    env_->Schedule(&DBImpl::BGWork, this);
672
24
  }
673
33.3k
}
674
675
24
void DBImpl::BGWork(void* db) {
676
24
  reinterpret_cast<DBImpl*>(db)->BackgroundCall();
677
24
}
678
679
24
void DBImpl::BackgroundCall() {
680
24
  MutexLock l(&mutex_);
681
24
  assert(background_compaction_scheduled_);
  Branch (681:3): [True: 24, False: 0]
682
24
  if (shutting_down_.load(std::memory_order_acquire)) {
  Branch (682:7): [True: 0, False: 24]
683
    // No more background work when shutting down.
684
24
  } else if (!bg_error_.ok()) {
  Branch (684:14): [True: 0, False: 24]
685
    // No more background work after a background error.
686
24
  } else {
687
24
    BackgroundCompaction();
688
24
  }
689
690
24
  background_compaction_scheduled_ = false;
691
692
  // Previous compaction may have produced too many files in a level,
693
  // so reschedule another compaction if needed.
694
24
  MaybeScheduleCompaction();
695
24
  background_work_finished_signal_.SignalAll();
696
24
}
697
698
24
void DBImpl::BackgroundCompaction() {
699
24
  mutex_.AssertHeld();
700
701
24
  if (imm_ != nullptr) {
  Branch (701:7): [True: 24, False: 0]
702
24
    CompactMemTable();
703
24
    return;
704
24
  }
705
706
0
  Compaction* c;
707
0
  bool is_manual = (manual_compaction_ != nullptr);
708
0
  InternalKey manual_end;
709
0
  if (is_manual) {
  Branch (709:7): [True: 0, False: 0]
710
0
    ManualCompaction* m = manual_compaction_;
711
0
    c = versions_->CompactRange(m->level, m->begin, m->end);
712
0
    m->done = (c == nullptr);
713
0
    if (c != nullptr) {
  Branch (713:9): [True: 0, False: 0]
714
0
      manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
715
0
    }
716
0
    Log(options_.info_log,
717
0
        "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
718
0
        m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
  Branch (718:20): [True: 0, False: 0]
719
0
        (m->end ? m->end->DebugString().c_str() : "(end)"),
  Branch (719:10): [True: 0, False: 0]
720
0
        (m->done ? "(end)" : manual_end.DebugString().c_str()));
  Branch (720:10): [True: 0, False: 0]
721
0
  } else {
722
0
    c = versions_->PickCompaction();
723
0
  }
724
725
0
  Status status;
726
0
  if (c == nullptr) {
  Branch (726:7): [True: 0, False: 0]
727
    // Nothing to do
728
0
  } else if (!is_manual && c->IsTrivialMove()) {
  Branch (728:14): [True: 0, False: 0]
  Branch (728:28): [True: 0, False: 0]
729
    // Move file to next level
730
0
    assert(c->num_input_files(0) == 1);
  Branch (730:5): [True: 0, False: 0]
731
0
    FileMetaData* f = c->input(0, 0);
732
0
    c->edit()->DeleteFile(c->level(), f->number);
733
0
    c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest,
734
0
                       f->largest);
735
0
    status = versions_->LogAndApply(c->edit(), &mutex_);
736
0
    if (!status.ok()) {
  Branch (736:9): [True: 0, False: 0]
737
0
      RecordBackgroundError(status);
738
0
    }
739
0
    VersionSet::LevelSummaryStorage tmp;
740
0
    Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n",
741
0
        static_cast<unsigned long long>(f->number), c->level() + 1,
742
0
        static_cast<unsigned long long>(f->file_size),
743
0
        status.ToString().c_str(), versions_->LevelSummary(&tmp));
744
0
  } else {
745
0
    CompactionState* compact = new CompactionState(c);
746
0
    status = DoCompactionWork(compact);
747
0
    if (!status.ok()) {
  Branch (747:9): [True: 0, False: 0]
748
0
      RecordBackgroundError(status);
749
0
    }
750
0
    CleanupCompaction(compact);
751
0
    c->ReleaseInputs();
752
0
    DeleteObsoleteFiles();
753
0
  }
754
0
  delete c;
755
756
0
  if (status.ok()) {
  Branch (756:7): [True: 0, False: 0]
757
    // Done
758
0
  } else if (shutting_down_.load(std::memory_order_acquire)) {
  Branch (758:14): [True: 0, False: 0]
759
    // Ignore compaction errors found during shutting down
760
0
  } else {
761
0
    Log(options_.info_log, "Compaction error: %s", status.ToString().c_str());
762
0
  }
763
764
0
  if (is_manual) {
  Branch (764:7): [True: 0, False: 0]
765
0
    ManualCompaction* m = manual_compaction_;
766
0
    if (!status.ok()) {
  Branch (766:9): [True: 0, False: 0]
767
0
      m->done = true;
768
0
    }
769
0
    if (!m->done) {
  Branch (769:9): [True: 0, False: 0]
770
      // We only compacted part of the requested range.  Update *m
771
      // to the range that is left to be compacted.
772
0
      m->tmp_storage = manual_end;
773
0
      m->begin = &m->tmp_storage;
774
0
    }
775
0
    manual_compaction_ = nullptr;
776
0
  }
777
0
}
778
779
0
void DBImpl::CleanupCompaction(CompactionState* compact) {
780
0
  mutex_.AssertHeld();
781
0
  if (compact->builder != nullptr) {
  Branch (781:7): [True: 0, False: 0]
782
    // May happen if we get a shutdown call in the middle of compaction
783
0
    compact->builder->Abandon();
784
0
    delete compact->builder;
785
0
  } else {
786
0
    assert(compact->outfile == nullptr);
  Branch (786:5): [True: 0, False: 0]
787
0
  }
788
0
  delete compact->outfile;
789
0
  for (size_t i = 0; i < compact->outputs.size(); i++) {
  Branch (789:22): [True: 0, False: 0]
790
0
    const CompactionState::Output& out = compact->outputs[i];
791
0
    pending_outputs_.erase(out.number);
792
0
  }
793
0
  delete compact;
794
0
}
795
796
0
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
797
0
  assert(compact != nullptr);
  Branch (797:3): [True: 0, False: 0]
798
0
  assert(compact->builder == nullptr);
  Branch (798:3): [True: 0, False: 0]
799
0
  uint64_t file_number;
800
0
  {
801
0
    mutex_.Lock();
802
0
    file_number = versions_->NewFileNumber();
803
0
    pending_outputs_.insert(file_number);
804
0
    CompactionState::Output out;
805
0
    out.number = file_number;
806
0
    out.smallest.Clear();
807
0
    out.largest.Clear();
808
0
    compact->outputs.push_back(out);
809
0
    mutex_.Unlock();
810
0
  }
811
812
  // Make the output file
813
0
  std::string fname = TableFileName(dbname_, file_number);
814
0
  Status s = env_->NewWritableFile(fname, &compact->outfile);
815
0
  if (s.ok()) {
  Branch (815:7): [True: 0, False: 0]
816
0
    compact->builder = new TableBuilder(options_, compact->outfile);
817
0
  }
818
0
  return s;
819
0
}
820
821
Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
822
0
                                          Iterator* input) {
823
0
  assert(compact != nullptr);
  Branch (823:3): [True: 0, False: 0]
824
0
  assert(compact->outfile != nullptr);
  Branch (824:3): [True: 0, False: 0]
825
0
  assert(compact->builder != nullptr);
  Branch (825:3): [True: 0, False: 0]
826
827
0
  const uint64_t output_number = compact->current_output()->number;
828
0
  assert(output_number != 0);
  Branch (828:3): [True: 0, False: 0]
829
830
  // Check for iterator errors
831
0
  Status s = input->status();
832
0
  const uint64_t current_entries = compact->builder->NumEntries();
833
0
  if (s.ok()) {
  Branch (833:7): [True: 0, False: 0]
834
0
    s = compact->builder->Finish();
835
0
  } else {
836
0
    compact->builder->Abandon();
837
0
  }
838
0
  const uint64_t current_bytes = compact->builder->FileSize();
839
0
  compact->current_output()->file_size = current_bytes;
840
0
  compact->total_bytes += current_bytes;
841
0
  delete compact->builder;
842
0
  compact->builder = nullptr;
843
844
  // Finish and check for file errors
845
0
  if (s.ok()) {
  Branch (845:7): [True: 0, False: 0]
846
0
    s = compact->outfile->Sync();
847
0
  }
848
0
  if (s.ok()) {
  Branch (848:7): [True: 0, False: 0]
849
0
    s = compact->outfile->Close();
850
0
  }
851
0
  delete compact->outfile;
852
0
  compact->outfile = nullptr;
853
854
0
  if (s.ok() && current_entries > 0) {
  Branch (854:7): [True: 0, False: 0]
  Branch (854:17): [True: 0, False: 0]
855
    // Verify that the table is usable
856
0
    Iterator* iter =
857
0
        table_cache_->NewIterator(ReadOptions(), output_number, current_bytes);
858
0
    s = iter->status();
859
0
    delete iter;
860
0
    if (s.ok()) {
  Branch (860:9): [True: 0, False: 0]
861
0
      Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes",
862
0
          (unsigned long long)output_number, compact->compaction->level(),
863
0
          (unsigned long long)current_entries,
864
0
          (unsigned long long)current_bytes);
865
0
    }
866
0
  }
867
0
  return s;
868
0
}
869
870
0
Status DBImpl::InstallCompactionResults(CompactionState* compact) {
871
0
  mutex_.AssertHeld();
872
0
  Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
873
0
      compact->compaction->num_input_files(0), compact->compaction->level(),
874
0
      compact->compaction->num_input_files(1), compact->compaction->level() + 1,
875
0
      static_cast<long long>(compact->total_bytes));
876
877
  // Add compaction outputs
878
0
  compact->compaction->AddInputDeletions(compact->compaction->edit());
879
0
  const int level = compact->compaction->level();
880
0
  for (size_t i = 0; i < compact->outputs.size(); i++) {
  Branch (880:22): [True: 0, False: 0]
881
0
    const CompactionState::Output& out = compact->outputs[i];
882
0
    compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size,
883
0
                                         out.smallest, out.largest);
884
0
  }
885
0
  return versions_->LogAndApply(compact->compaction->edit(), &mutex_);
886
0
}
887
888
0
Status DBImpl::DoCompactionWork(CompactionState* compact) {
889
0
  const uint64_t start_micros = env_->NowMicros();
890
0
  int64_t imm_micros = 0;  // Micros spent doing imm_ compactions
891
892
0
  Log(options_.info_log, "Compacting %d@%d + %d@%d files",
893
0
      compact->compaction->num_input_files(0), compact->compaction->level(),
894
0
      compact->compaction->num_input_files(1),
895
0
      compact->compaction->level() + 1);
896
897
0
  assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
  Branch (897:3): [True: 0, False: 0]
898
0
  assert(compact->builder == nullptr);
  Branch (898:3): [True: 0, False: 0]
899
0
  assert(compact->outfile == nullptr);
  Branch (899:3): [True: 0, False: 0]
900
0
  if (snapshots_.empty()) {
  Branch (900:7): [True: 0, False: 0]
901
0
    compact->smallest_snapshot = versions_->LastSequence();
902
0
  } else {
903
0
    compact->smallest_snapshot = snapshots_.oldest()->sequence_number();
904
0
  }
905
906
0
  Iterator* input = versions_->MakeInputIterator(compact->compaction);
907
908
  // Release mutex while we're actually doing the compaction work
909
0
  mutex_.Unlock();
910
911
0
  input->SeekToFirst();
912
0
  Status status;
913
0
  ParsedInternalKey ikey;
914
0
  std::string current_user_key;
915
0
  bool has_current_user_key = false;
916
0
  SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
917
0
  while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) {
  Branch (917:10): [True: 0, False: 0]
  Branch (917:28): [True: 0, False: 0]
918
    // Prioritize immutable compaction work
919
0
    if (has_imm_.load(std::memory_order_relaxed)) {
  Branch (919:9): [True: 0, False: 0]
920
0
      const uint64_t imm_start = env_->NowMicros();
921
0
      mutex_.Lock();
922
0
      if (imm_ != nullptr) {
  Branch (922:11): [True: 0, False: 0]
923
0
        CompactMemTable();
924
        // Wake up MakeRoomForWrite() if necessary.
925
0
        background_work_finished_signal_.SignalAll();
926
0
      }
927
0
      mutex_.Unlock();
928
0
      imm_micros += (env_->NowMicros() - imm_start);
929
0
    }
930
931
0
    Slice key = input->key();
932
0
    if (compact->compaction->ShouldStopBefore(key) &&
  Branch (932:9): [True: 0, False: 0]
933
0
        compact->builder != nullptr) {
  Branch (933:9): [True: 0, False: 0]
934
0
      status = FinishCompactionOutputFile(compact, input);
935
0
      if (!status.ok()) {
  Branch (935:11): [True: 0, False: 0]
936
0
        break;
937
0
      }
938
0
    }
939
940
    // Handle key/value, add to state, etc.
941
0
    bool drop = false;
942
0
    if (!ParseInternalKey(key, &ikey)) {
  Branch (942:9): [True: 0, False: 0]
943
      // Do not hide error keys
944
0
      current_user_key.clear();
945
0
      has_current_user_key = false;
946
0
      last_sequence_for_key = kMaxSequenceNumber;
947
0
    } else {
948
0
      if (!has_current_user_key ||
  Branch (948:11): [True: 0, False: 0]
  Branch (948:11): [True: 0, False: 0]
949
0
          user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) !=
  Branch (949:11): [True: 0, False: 0]
950
0
              0) {
951
        // First occurrence of this user key
952
0
        current_user_key.assign(ikey.user_key.data(), ikey.user_key.size());
953
0
        has_current_user_key = true;
954
0
        last_sequence_for_key = kMaxSequenceNumber;
955
0
      }
956
957
0
      if (last_sequence_for_key <= compact->smallest_snapshot) {
  Branch (957:11): [True: 0, False: 0]
958
        // Hidden by an newer entry for same user key
959
0
        drop = true;  // (A)
960
0
      } else if (ikey.type == kTypeDeletion &&
  Branch (960:18): [True: 0, False: 0]
961
0
                 ikey.sequence <= compact->smallest_snapshot &&
  Branch (961:18): [True: 0, False: 0]
962
0
                 compact->compaction->IsBaseLevelForKey(ikey.user_key)) {
  Branch (962:18): [True: 0, False: 0]
963
        // For this user key:
964
        // (1) there is no data in higher levels
965
        // (2) data in lower levels will have larger sequence numbers
966
        // (3) data in layers that are being compacted here and have
967
        //     smaller sequence numbers will be dropped in the next
968
        //     few iterations of this loop (by rule (A) above).
969
        // Therefore this deletion marker is obsolete and can be dropped.
970
0
        drop = true;
971
0
      }
972
973
0
      last_sequence_for_key = ikey.sequence;
974
0
    }
975
#if 0
976
    Log(options_.info_log,
977
        "  Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, "
978
        "%d smallest_snapshot: %d",
979
        ikey.user_key.ToString().c_str(),
980
        (int)ikey.sequence, ikey.type, kTypeValue, drop,
981
        compact->compaction->IsBaseLevelForKey(ikey.user_key),
982
        (int)last_sequence_for_key, (int)compact->smallest_snapshot);
983
#endif
984
985
0
    if (!drop) {
  Branch (985:9): [True: 0, False: 0]
986
      // Open output file if necessary
987
0
      if (compact->builder == nullptr) {
  Branch (987:11): [True: 0, False: 0]
988
0
        status = OpenCompactionOutputFile(compact);
989
0
        if (!status.ok()) {
  Branch (989:13): [True: 0, False: 0]
990
0
          break;
991
0
        }
992
0
      }
993
0
      if (compact->builder->NumEntries() == 0) {
  Branch (993:11): [True: 0, False: 0]
994
0
        compact->current_output()->smallest.DecodeFrom(key);
995
0
      }
996
0
      compact->current_output()->largest.DecodeFrom(key);
997
0
      compact->builder->Add(key, input->value());
998
999
      // Close output file if it is big enough
1000
0
      if (compact->builder->FileSize() >=
  Branch (1000:11): [True: 0, False: 0]
1001
0
          compact->compaction->MaxOutputFileSize()) {
1002
0
        status = FinishCompactionOutputFile(compact, input);
1003
0
        if (!status.ok()) {
  Branch (1003:13): [True: 0, False: 0]
1004
0
          break;
1005
0
        }
1006
0
      }
1007
0
    }
1008
1009
0
    input->Next();
1010
0
  }
1011
1012
0
  if (status.ok() && shutting_down_.load(std::memory_order_acquire)) {
  Branch (1012:7): [True: 0, False: 0]
  Branch (1012:22): [True: 0, False: 0]
1013
0
    status = Status::IOError("Deleting DB during compaction");
1014
0
  }
1015
0
  if (status.ok() && compact->builder != nullptr) {
  Branch (1015:7): [True: 0, False: 0]
  Branch (1015:22): [True: 0, False: 0]
1016
0
    status = FinishCompactionOutputFile(compact, input);
1017
0
  }
1018
0
  if (status.ok()) {
  Branch (1018:7): [True: 0, False: 0]
1019
0
    status = input->status();
1020
0
  }
1021
0
  delete input;
1022
0
  input = nullptr;
1023
1024
0
  CompactionStats stats;
1025
0
  stats.micros = env_->NowMicros() - start_micros - imm_micros;
1026
0
  for (int which = 0; which < 2; which++) {
  Branch (1026:23): [True: 0, False: 0]
1027
0
    for (int i = 0; i < compact->compaction->num_input_files(which); i++) {
  Branch (1027:21): [True: 0, False: 0]
1028
0
      stats.bytes_read += compact->compaction->input(which, i)->file_size;
1029
0
    }
1030
0
  }
1031
0
  for (size_t i = 0; i < compact->outputs.size(); i++) {
  Branch (1031:22): [True: 0, False: 0]
1032
0
    stats.bytes_written += compact->outputs[i].file_size;
1033
0
  }
1034
1035
0
  mutex_.Lock();
1036
0
  stats_[compact->compaction->level() + 1].Add(stats);
1037
1038
0
  if (status.ok()) {
  Branch (1038:7): [True: 0, False: 0]
1039
0
    status = InstallCompactionResults(compact);
1040
0
  }
1041
0
  if (!status.ok()) {
  Branch (1041:7): [True: 0, False: 0]
1042
0
    RecordBackgroundError(status);
1043
0
  }
1044
0
  VersionSet::LevelSummaryStorage tmp;
1045
0
  Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp));
1046
0
  return status;
1047
0
}
1048
1049
namespace {
1050
1051
struct IterState {
1052
  port::Mutex* const mu;
1053
  Version* const version GUARDED_BY(mu);
1054
  MemTable* const mem GUARDED_BY(mu);
1055
  MemTable* const imm GUARDED_BY(mu);
1056
1057
  IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version)
1058
35.8k
      : mu(mutex), version(version), mem(mem), imm(imm) {}
1059
};
1060
1061
35.8k
static void CleanupIteratorState(void* arg1, void* arg2) {
1062
35.8k
  IterState* state = reinterpret_cast<IterState*>(arg1);
1063
35.8k
  state->mu->Lock();
1064
35.8k
  state->mem->Unref();
1065
35.8k
  if (state->imm != nullptr) state->imm->Unref();
  Branch (1065:7): [True: 0, False: 35.8k]
1066
35.8k
  state->version->Unref();
1067
35.8k
  state->mu->Unlock();
1068
35.8k
  delete state;
1069
35.8k
}
1070
1071
}  // anonymous namespace
1072
1073
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
1074
                                      SequenceNumber* latest_snapshot,
1075
35.8k
                                      uint32_t* seed) {
1076
35.8k
  mutex_.Lock();
1077
35.8k
  *latest_snapshot = versions_->LastSequence();
1078
1079
  // Collect together all needed child iterators
1080
35.8k
  std::vector<Iterator*> list;
1081
35.8k
  list.push_back(mem_->NewIterator());
1082
35.8k
  mem_->Ref();
1083
35.8k
  if (imm_ != nullptr) {
  Branch (1083:7): [True: 0, False: 35.8k]
1084
0
    list.push_back(imm_->NewIterator());
1085
0
    imm_->Ref();
1086
0
  }
1087
35.8k
  versions_->current()->AddIterators(options, &list);
1088
35.8k
  Iterator* internal_iter =
1089
35.8k
      NewMergingIterator(&internal_comparator_, &list[0], list.size());
1090
35.8k
  versions_->current()->Ref();
1091
1092
35.8k
  IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());
1093
35.8k
  internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);
1094
1095
35.8k
  *seed = ++seed_;
1096
35.8k
  mutex_.Unlock();
1097
35.8k
  return internal_iter;
1098
35.8k
}
1099
1100
0
Iterator* DBImpl::TEST_NewInternalIterator() {
1101
0
  SequenceNumber ignored;
1102
0
  uint32_t ignored_seed;
1103
0
  return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
1104
0
}
1105
1106
0
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
1107
0
  MutexLock l(&mutex_);
1108
0
  return versions_->MaxNextLevelOverlappingBytes();
1109
0
}
1110
1111
Status DBImpl::Get(const ReadOptions& options, const Slice& key,
1112
5.07M
                   std::string* value) {
1113
5.07M
  Status s;
1114
5.07M
  MutexLock l(&mutex_);
1115
5.07M
  SequenceNumber snapshot;
1116
5.07M
  if (options.snapshot != nullptr) {
  Branch (1116:7): [True: 0, False: 5.07M]
1117
0
    snapshot =
1118
0
        static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number();
1119
5.07M
  } else {
1120
5.07M
    snapshot = versions_->LastSequence();
1121
5.07M
  }
1122
1123
5.07M
  MemTable* mem = mem_;
1124
5.07M
  MemTable* imm = imm_;
1125
5.07M
  Version* current = versions_->current();
1126
5.07M
  mem->Ref();
1127
5.07M
  if (imm != nullptr) imm->Ref();
  Branch (1127:7): [True: 0, False: 5.07M]
1128
5.07M
  current->Ref();
1129
1130
5.07M
  bool have_stat_update = false;
1131
5.07M
  Version::GetStats stats;
1132
1133
  // Unlock while reading from files and memtables
1134
5.07M
  {
1135
5.07M
    mutex_.Unlock();
1136
    // First look in the memtable, then in the immutable memtable (if any).
1137
5.07M
    LookupKey lkey(key, snapshot);
1138
5.07M
    if (mem->Get(lkey, value, &s)) {
  Branch (1138:9): [True: 21.2k, False: 5.05M]
1139
      // Done
1140
5.05M
    } else if (imm != nullptr && imm->Get(lkey, value, &s)) {
  Branch (1140:16): [True: 0, False: 5.05M]
  Branch (1140:34): [True: 0, False: 0]
1141
      // Done
1142
5.05M
    } else {
1143
5.05M
      s = current->Get(options, lkey, value, &stats);
1144
5.05M
      have_stat_update = true;
1145
5.05M
    }
1146
5.07M
    mutex_.Lock();
1147
5.07M
  }
1148
1149
5.07M
  if (have_stat_update && current->UpdateStats(stats)) {
  Branch (1149:7): [True: 5.05M, False: 21.2k]
  Branch (1149:27): [True: 0, False: 5.05M]
1150
0
    MaybeScheduleCompaction();
1151
0
  }
1152
5.07M
  mem->Unref();
1153
5.07M
  if (imm != nullptr) imm->Unref();
  Branch (1153:7): [True: 0, False: 5.07M]
1154
5.07M
  current->Unref();
1155
5.07M
  return s;
1156
5.07M
}
1157
1158
35.8k
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
1159
35.8k
  SequenceNumber latest_snapshot;
1160
35.8k
  uint32_t seed;
1161
35.8k
  Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
1162
35.8k
  return NewDBIterator(this, user_comparator(), iter,
1163
35.8k
                       (options.snapshot != nullptr
  Branch (1163:25): [True: 0, False: 35.8k]
1164
35.8k
                            ? static_cast<const SnapshotImpl*>(options.snapshot)
1165
0
                                  ->sequence_number()
1166
35.8k
                            : latest_snapshot),
1167
35.8k
                       seed);
1168
35.8k
}
1169
1170
0
void DBImpl::RecordReadSample(Slice key) {
1171
0
  MutexLock l(&mutex_);
1172
0
  if (versions_->current()->RecordReadSample(key)) {
  Branch (1172:7): [True: 0, False: 0]
1173
0
    MaybeScheduleCompaction();
1174
0
  }
1175
0
}
1176
1177
0
const Snapshot* DBImpl::GetSnapshot() {
1178
0
  MutexLock l(&mutex_);
1179
0
  return snapshots_.New(versions_->LastSequence());
1180
0
}
1181
1182
0
void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) {
1183
0
  MutexLock l(&mutex_);
1184
0
  snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot));
1185
0
}
1186
1187
// Convenience methods
1188
0
Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) {
1189
0
  return DB::Put(o, key, val);
1190
0
}
1191
1192
0
Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
1193
0
  return DB::Delete(options, key);
1194
0
}
1195
1196
2.32M
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
1197
2.32M
  Writer w(&mutex_);
1198
2.32M
  w.batch = updates;
1199
2.32M
  w.sync = options.sync;
1200
2.32M
  w.done = false;
1201
1202
2.32M
  MutexLock l(&mutex_);
1203
2.32M
  writers_.push_back(&w);
1204
2.32M
  while (!w.done && &w != writers_.front()) {
  Branch (1204:10): [True: 2.32M, False: 0]
  Branch (1204:21): [True: 0, False: 2.32M]
1205
0
    w.cv.Wait();
1206
0
  }
1207
2.32M
  if (w.done) {
  Branch (1207:7): [True: 0, False: 2.32M]
1208
0
    return w.status;
1209
0
  }
1210
1211
  // May temporarily unlock and wait.
1212
2.32M
  Status status = MakeRoomForWrite(updates == nullptr);
1213
2.32M
  uint64_t last_sequence = versions_->LastSequence();
1214
2.32M
  Writer* last_writer = &w;
1215
2.32M
  if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions
  Branch (1215:7): [True: 2.32M, False: 0]
  Branch (1215:22): [True: 2.32M, False: 0]
1216
2.32M
    WriteBatch* write_batch = BuildBatchGroup(&last_writer);
1217
2.32M
    WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);
1218
2.32M
    last_sequence += WriteBatchInternal::Count(write_batch);
1219
1220
    // Add to log and apply to memtable.  We can release the lock
1221
    // during this phase since &w is currently responsible for logging
1222
    // and protects against concurrent loggers and concurrent writes
1223
    // into mem_.
1224
2.32M
    {
1225
2.32M
      mutex_.Unlock();
1226
2.32M
      status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
1227
2.32M
      bool sync_error = false;
1228
2.32M
      if (status.ok() && options.sync) {
  Branch (1228:11): [True: 2.32M, False: 0]
  Branch (1228:26): [True: 29.2k, False: 2.30M]
1229
29.2k
        status = logfile_->Sync();
1230
29.2k
        if (!status.ok()) {
  Branch (1230:13): [True: 0, False: 29.2k]
1231
0
          sync_error = true;
1232
0
        }
1233
29.2k
      }
1234
2.32M
      if (status.ok()) {
  Branch (1234:11): [True: 2.32M, False: 0]
1235
2.32M
        status = WriteBatchInternal::InsertInto(write_batch, mem_);
1236
2.32M
      }
1237
2.32M
      mutex_.Lock();
1238
2.32M
      if (sync_error) {
  Branch (1238:11): [True: 0, False: 2.32M]
1239
        // The state of the log file is indeterminate: the log record we
1240
        // just added may or may not show up when the DB is re-opened.
1241
        // So we force the DB into a mode where all future writes fail.
1242
0
        RecordBackgroundError(status);
1243
0
      }
1244
2.32M
    }
1245
2.32M
    if (write_batch == tmp_batch_) tmp_batch_->Clear();
  Branch (1245:9): [True: 0, False: 2.32M]
1246
1247
2.32M
    versions_->SetLastSequence(last_sequence);
1248
2.32M
  }
1249
1250
2.32M
  while (true) {
  Branch (1250:10): [Folded - Ignored]
1251
2.32M
    Writer* ready = writers_.front();
1252
2.32M
    writers_.pop_front();
1253
2.32M
    if (ready != &w) {
  Branch (1253:9): [True: 0, False: 2.32M]
1254
0
      ready->status = status;
1255
0
      ready->done = true;
1256
0
      ready->cv.Signal();
1257
0
    }
1258
2.32M
    if (ready == last_writer) break;
  Branch (1258:9): [True: 2.32M, False: 0]
1259
2.32M
  }
1260
1261
  // Notify new head of write queue
1262
2.32M
  if (!writers_.empty()) {
  Branch (1262:7): [True: 0, False: 2.32M]
1263
0
    writers_.front()->cv.Signal();
1264
0
  }
1265
1266
2.32M
  return status;
1267
2.32M
}
1268
1269
// REQUIRES: Writer list must be non-empty
1270
// REQUIRES: First writer must have a non-null batch
1271
2.32M
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
1272
2.32M
  mutex_.AssertHeld();
1273
2.32M
  assert(!writers_.empty());
  Branch (1273:3): [True: 2.32M, False: 0]
1274
2.32M
  Writer* first = writers_.front();
1275
2.32M
  WriteBatch* result = first->batch;
1276
2.32M
  assert(result != nullptr);
  Branch (1276:3): [True: 2.32M, False: 0]
1277
1278
2.32M
  size_t size = WriteBatchInternal::ByteSize(first->batch);
1279
1280
  // Allow the group to grow up to a maximum size, but if the
1281
  // original write is small, limit the growth so we do not slow
1282
  // down the small write too much.
1283
2.32M
  size_t max_size = 1 << 20;
1284
2.32M
  if (size <= (128 << 10)) {
  Branch (1284:7): [True: 2.32M, False: 0]
1285
2.32M
    max_size = size + (128 << 10);
1286
2.32M
  }
1287
1288
2.32M
  *last_writer = first;
1289
2.32M
  std::deque<Writer*>::iterator iter = writers_.begin();
1290
2.32M
  ++iter;  // Advance past "first"
1291
2.32M
  for (; iter != writers_.end(); ++iter) {
  Branch (1291:10): [True: 0, False: 2.32M]
1292
0
    Writer* w = *iter;
1293
0
    if (w->sync && !first->sync) {
  Branch (1293:9): [True: 0, False: 0]
  Branch (1293:20): [True: 0, False: 0]
1294
      // Do not include a sync write into a batch handled by a non-sync write.
1295
0
      break;
1296
0
    }
1297
1298
0
    if (w->batch != nullptr) {
  Branch (1298:9): [True: 0, False: 0]
1299
0
      size += WriteBatchInternal::ByteSize(w->batch);
1300
0
      if (size > max_size) {
  Branch (1300:11): [True: 0, False: 0]
1301
        // Do not make batch too big
1302
0
        break;
1303
0
      }
1304
1305
      // Append to *result
1306
0
      if (result == first->batch) {
  Branch (1306:11): [True: 0, False: 0]
1307
        // Switch to temporary batch instead of disturbing caller's batch
1308
0
        result = tmp_batch_;
1309
0
        assert(WriteBatchInternal::Count(result) == 0);
  Branch (1309:9): [True: 0, False: 0]
1310
0
        WriteBatchInternal::Append(result, first->batch);
1311
0
      }
1312
0
      WriteBatchInternal::Append(result, w->batch);
1313
0
    }
1314
0
    *last_writer = w;
1315
0
  }
1316
2.32M
  return result;
1317
2.32M
}
1318
1319
// REQUIRES: mutex_ is held
1320
// REQUIRES: this thread is currently at the front of the writer queue
1321
2.32M
Status DBImpl::MakeRoomForWrite(bool force) {
1322
2.32M
  mutex_.AssertHeld();
1323
2.32M
  assert(!writers_.empty());
  Branch (1323:3): [True: 2.32M, False: 0]
1324
2.32M
  bool allow_delay = !force;
1325
2.32M
  Status s;
1326
2.32M
  while (true) {
  Branch (1326:10): [Folded - Ignored]
1327
2.32M
    if (!bg_error_.ok()) {
  Branch (1327:9): [True: 0, False: 2.32M]
1328
      // Yield previous error
1329
0
      s = bg_error_;
1330
0
      break;
1331
2.32M
    } else if (allow_delay && versions_->NumLevelFiles(0) >=
  Branch (1331:16): [True: 2.32M, False: 0]
  Branch (1331:31): [True: 0, False: 2.32M]
1332
2.32M
                                  config::kL0_SlowdownWritesTrigger) {
1333
      // We are getting close to hitting a hard limit on the number of
1334
      // L0 files.  Rather than delaying a single write by several
1335
      // seconds when we hit the hard limit, start delaying each
1336
      // individual write by 1ms to reduce latency variance.  Also,
1337
      // this delay hands over some CPU to the compaction thread in
1338
      // case it is sharing the same core as the writer.
1339
0
      mutex_.Unlock();
1340
0
      env_->SleepForMicroseconds(1000);
1341
0
      allow_delay = false;  // Do not delay a single write more than once
1342
0
      mutex_.Lock();
1343
2.32M
    } else if (!force &&
  Branch (1343:16): [True: 2.32M, False: 0]
1344
2.32M
               (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
  Branch (1344:16): [True: 2.32M, False: 24]
1345
      // There is room in current memtable
1346
2.32M
      break;
1347
2.32M
    } else if (imm_ != nullptr) {
  Branch (1347:16): [True: 0, False: 24]
1348
      // We have filled up the current memtable, but the previous
1349
      // one is still being compacted, so we wait.
1350
0
      Log(options_.info_log, "Current memtable full; waiting...\n");
1351
0
      background_work_finished_signal_.Wait();
1352
24
    } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
  Branch (1352:16): [True: 0, False: 24]
1353
      // There are too many level-0 files.
1354
0
      Log(options_.info_log, "Too many L0 files; waiting...\n");
1355
0
      background_work_finished_signal_.Wait();
1356
24
    } else {
1357
      // Attempt to switch to a new memtable and trigger compaction of old
1358
24
      assert(versions_->PrevLogNumber() == 0);
  Branch (1358:7): [True: 24, False: 0]
1359
24
      uint64_t new_log_number = versions_->NewFileNumber();
1360
24
      WritableFile* lfile = nullptr;
1361
24
      s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
1362
24
      if (!s.ok()) {
  Branch (1362:11): [True: 0, False: 24]
1363
        // Avoid chewing through file number space in a tight loop.
1364
0
        versions_->ReuseFileNumber(new_log_number);
1365
0
        break;
1366
0
      }
1367
24
      delete log_;
1368
24
      delete logfile_;
1369
24
      logfile_ = lfile;
1370
24
      logfile_number_ = new_log_number;
1371
24
      log_ = new log::Writer(lfile);
1372
24
      imm_ = mem_;
1373
24
      has_imm_.store(true, std::memory_order_release);
1374
24
      mem_ = new MemTable(internal_comparator_);
1375
24
      mem_->Ref();
1376
24
      force = false;  // Do not force another compaction if have room
1377
24
      MaybeScheduleCompaction();
1378
24
    }
1379
2.32M
  }
1380
2.32M
  return s;
1381
2.32M
}
1382
1383
0
bool DBImpl::GetProperty(const Slice& property, std::string* value) {
1384
0
  value->clear();
1385
1386
0
  MutexLock l(&mutex_);
1387
0
  Slice in = property;
1388
0
  Slice prefix("leveldb.");
1389
0
  if (!in.starts_with(prefix)) return false;
  Branch (1389:7): [True: 0, False: 0]
1390
0
  in.remove_prefix(prefix.size());
1391
1392
0
  if (in.starts_with("num-files-at-level")) {
  Branch (1392:7): [True: 0, False: 0]
1393
0
    in.remove_prefix(strlen("num-files-at-level"));
1394
0
    uint64_t level;
1395
0
    bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
  Branch (1395:15): [True: 0, False: 0]
  Branch (1395:52): [True: 0, False: 0]
1396
0
    if (!ok || level >= config::kNumLevels) {
  Branch (1396:9): [True: 0, False: 0]
  Branch (1396:16): [True: 0, False: 0]
1397
0
      return false;
1398
0
    } else {
1399
0
      char buf[100];
1400
0
      snprintf(buf, sizeof(buf), "%d",
1401
0
               versions_->NumLevelFiles(static_cast<int>(level)));
1402
0
      *value = buf;
1403
0
      return true;
1404
0
    }
1405
0
  } else if (in == "stats") {
  Branch (1405:14): [True: 0, False: 0]
1406
0
    char buf[200];
1407
0
    snprintf(buf, sizeof(buf),
1408
0
             "                               Compactions\n"
1409
0
             "Level  Files Size(MB) Time(sec) Read(MB) Write(MB)\n"
1410
0
             "--------------------------------------------------\n");
1411
0
    value->append(buf);
1412
0
    for (int level = 0; level < config::kNumLevels; level++) {
  Branch (1412:25): [True: 0, False: 0]
1413
0
      int files = versions_->NumLevelFiles(level);
1414
0
      if (stats_[level].micros > 0 || files > 0) {
  Branch (1414:11): [True: 0, False: 0]
  Branch (1414:39): [True: 0, False: 0]
1415
0
        snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n", level,
1416
0
                 files, versions_->NumLevelBytes(level) / 1048576.0,
1417
0
                 stats_[level].micros / 1e6,
1418
0
                 stats_[level].bytes_read / 1048576.0,
1419
0
                 stats_[level].bytes_written / 1048576.0);
1420
0
        value->append(buf);
1421
0
      }
1422
0
    }
1423
0
    return true;
1424
0
  } else if (in == "sstables") {
  Branch (1424:14): [True: 0, False: 0]
1425
0
    *value = versions_->current()->DebugString();
1426
0
    return true;
1427
0
  } else if (in == "approximate-memory-usage") {
  Branch (1427:14): [True: 0, False: 0]
1428
0
    size_t total_usage = options_.block_cache->TotalCharge();
1429
0
    if (mem_) {
  Branch (1429:9): [True: 0, False: 0]
1430
0
      total_usage += mem_->ApproximateMemoryUsage();
1431
0
    }
1432
0
    if (imm_) {
  Branch (1432:9): [True: 0, False: 0]
1433
0
      total_usage += imm_->ApproximateMemoryUsage();
1434
0
    }
1435
0
    char buf[50];
1436
0
    snprintf(buf, sizeof(buf), "%llu",
1437
0
             static_cast<unsigned long long>(total_usage));
1438
0
    value->append(buf);
1439
0
    return true;
1440
0
  }
1441
1442
0
  return false;
1443
0
}
1444
1445
0
void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) {
1446
  // TODO(opt): better implementation
1447
0
  MutexLock l(&mutex_);
1448
0
  Version* v = versions_->current();
1449
0
  v->Ref();
1450
1451
0
  for (int i = 0; i < n; i++) {
  Branch (1451:19): [True: 0, False: 0]
1452
    // Convert user_key into a corresponding internal key.
1453
0
    InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek);
1454
0
    InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek);
1455
0
    uint64_t start = versions_->ApproximateOffsetOf(v, k1);
1456
0
    uint64_t limit = versions_->ApproximateOffsetOf(v, k2);
1457
0
    sizes[i] = (limit >= start ? limit - start : 0);
  Branch (1457:17): [True: 0, False: 0]
1458
0
  }
1459
1460
0
  v->Unref();
1461
0
}
1462
1463
// Default implementations of convenience methods that subclasses of DB
1464
// can call if they wish
1465
0
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {
1466
0
  WriteBatch batch;
1467
0
  batch.Put(key, value);
1468
0
  return Write(opt, &batch);
1469
0
}
1470
1471
0
Status DB::Delete(const WriteOptions& opt, const Slice& key) {
1472
0
  WriteBatch batch;
1473
0
  batch.Delete(key);
1474
0
  return Write(opt, &batch);
1475
0
}
1476
1477
33.2k
DB::~DB() = default;
1478
1479
33.2k
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1480
33.2k
  *dbptr = nullptr;
1481
1482
33.2k
  DBImpl* impl = new DBImpl(options, dbname);
1483
33.2k
  impl->mutex_.Lock();
1484
33.2k
  VersionEdit edit;
1485
  // Recover handles create_if_missing, error_if_exists
1486
33.2k
  bool save_manifest = false;
1487
33.2k
  Status s = impl->Recover(&edit, &save_manifest);
1488
33.2k
  if (s.ok() && impl->mem_ == nullptr) {
  Branch (1488:7): [True: 33.2k, False: 0]
  Branch (1488:17): [True: 33.2k, False: 0]
1489
    // Create new log and a corresponding memtable.
1490
33.2k
    uint64_t new_log_number = impl->versions_->NewFileNumber();
1491
33.2k
    WritableFile* lfile;
1492
33.2k
    s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
1493
33.2k
                                     &lfile);
1494
33.2k
    if (s.ok()) {
  Branch (1494:9): [True: 33.2k, False: 0]
1495
33.2k
      edit.SetLogNumber(new_log_number);
1496
33.2k
      impl->logfile_ = lfile;
1497
33.2k
      impl->logfile_number_ = new_log_number;
1498
33.2k
      impl->log_ = new log::Writer(lfile);
1499
33.2k
      impl->mem_ = new MemTable(impl->internal_comparator_);
1500
33.2k
      impl->mem_->Ref();
1501
33.2k
    }
1502
33.2k
  }
1503
33.2k
  if (s.ok() && save_manifest) {
  Branch (1503:7): [True: 33.2k, False: 0]
  Branch (1503:17): [True: 33.2k, False: 0]
1504
33.2k
    edit.SetPrevLogNumber(0);  // No older logs needed after recovery.
1505
33.2k
    edit.SetLogNumber(impl->logfile_number_);
1506
33.2k
    s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
1507
33.2k
  }
1508
33.2k
  if (s.ok()) {
  Branch (1508:7): [True: 33.2k, False: 0]
1509
33.2k
    impl->DeleteObsoleteFiles();
1510
33.2k
    impl->MaybeScheduleCompaction();
1511
33.2k
  }
1512
33.2k
  impl->mutex_.Unlock();
1513
33.2k
  if (s.ok()) {
  Branch (1513:7): [True: 33.2k, False: 0]
1514
33.2k
    assert(impl->mem_ != nullptr);
  Branch (1514:5): [True: 33.2k, False: 0]
1515
33.2k
    *dbptr = impl;
1516
33.2k
  } else {
1517
0
    delete impl;
1518
0
  }
1519
33.2k
  return s;
1520
33.2k
}
1521
1522
33.2k
Snapshot::~Snapshot() = default;
1523
1524
0
Status DestroyDB(const std::string& dbname, const Options& options) {
1525
0
  Env* env = options.env;
1526
0
  std::vector<std::string> filenames;
1527
0
  Status result = env->GetChildren(dbname, &filenames);
1528
0
  if (!result.ok()) {
  Branch (1528:7): [True: 0, False: 0]
1529
    // Ignore error in case directory does not exist
1530
0
    return Status::OK();
1531
0
  }
1532
1533
0
  FileLock* lock;
1534
0
  const std::string lockname = LockFileName(dbname);
1535
0
  result = env->LockFile(lockname, &lock);
1536
0
  if (result.ok()) {
  Branch (1536:7): [True: 0, False: 0]
1537
0
    uint64_t number;
1538
0
    FileType type;
1539
0
    for (size_t i = 0; i < filenames.size(); i++) {
  Branch (1539:24): [True: 0, False: 0]
1540
0
      if (ParseFileName(filenames[i], &number, &type) &&
  Branch (1540:11): [True: 0, False: 0]
1541
0
          type != kDBLockFile) {  // Lock file will be deleted at end
  Branch (1541:11): [True: 0, False: 0]
1542
0
        Status del = env->DeleteFile(dbname + "/" + filenames[i]);
1543
0
        if (result.ok() && !del.ok()) {
  Branch (1543:13): [True: 0, False: 0]
  Branch (1543:28): [True: 0, False: 0]
1544
0
          result = del;
1545
0
        }
1546
0
      }
1547
0
    }
1548
0
    env->UnlockFile(lock);  // Ignore error since state is already gone
1549
0
    env->DeleteFile(lockname);
1550
0
    env->DeleteDir(dbname);  // Ignore error in case dir contains other files
1551
0
  }
1552
0
  return result;
1553
0
}
1554
1555
}  // namespace leveldb