|
| 1 | +use std::path::PathBuf; |
| 2 | + |
| 3 | +use chainhook_event_observer::{rocksdb::Options, rocksdb::DB, utils::Context}; |
| 4 | +use chainhook_types::{BlockIdentifier, StacksBlockData}; |
| 5 | + |
| 6 | +fn get_db_default_options() -> Options { |
| 7 | + let mut opts = Options::default(); |
| 8 | + opts.create_if_missing(true); |
| 9 | + // opts.prepare_for_bulk_load(); |
| 10 | + // opts.set_compression_type(rocksdb::DBCompressionType::Lz4); |
| 11 | + // opts.set_blob_compression_type(rocksdb::DBCompressionType::Lz4); |
| 12 | + // opts.increase_parallelism(parallelism) |
| 13 | + // Per rocksdb's documentation: |
| 14 | + // If cache_index_and_filter_blocks is false (which is default), |
| 15 | + // the number of index/filter blocks is controlled by option max_open_files. |
| 16 | + // If you are certain that your ulimit will always be bigger than number of files in the database, |
| 17 | + // we recommend setting max_open_files to -1, which means infinity. |
| 18 | + // This option will preload all filter and index blocks and will not need to maintain LRU of files. |
| 19 | + // Setting max_open_files to -1 will get you the best possible performance. |
| 20 | + opts.set_max_open_files(2048); |
| 21 | + opts |
| 22 | +} |
| 23 | + |
| 24 | +fn get_default_stacks_db_file_path(base_dir: &PathBuf) -> PathBuf { |
| 25 | + let mut destination_path = base_dir.clone(); |
| 26 | + destination_path.push("stacks.rocksdb"); |
| 27 | + destination_path |
| 28 | +} |
| 29 | + |
| 30 | +pub fn open_readonly_stacks_db_conn(base_dir: &PathBuf, _ctx: &Context) -> Result<DB, String> { |
| 31 | + let path = get_default_stacks_db_file_path(&base_dir); |
| 32 | + let opts = get_db_default_options(); |
| 33 | + let db = DB::open_for_read_only(&opts, path, false) |
| 34 | + .map_err(|e| format!("unable to open stacks.rocksdb: {}", e.to_string()))?; |
| 35 | + Ok(db) |
| 36 | +} |
| 37 | + |
| 38 | +pub fn open_readwrite_stacks_db_conn(base_dir: &PathBuf, _ctx: &Context) -> Result<DB, String> { |
| 39 | + let path = get_default_stacks_db_file_path(&base_dir); |
| 40 | + let opts = get_db_default_options(); |
| 41 | + let db = DB::open(&opts, path) |
| 42 | + .map_err(|e| format!("unable to open stacks.rocksdb: {}", e.to_string()))?; |
| 43 | + Ok(db) |
| 44 | +} |
| 45 | + |
| 46 | +fn get_default_bitcoin_db_file_path(base_dir: &PathBuf) -> PathBuf { |
| 47 | + let mut destination_path = base_dir.clone(); |
| 48 | + destination_path.push("bitcoin.rocksdb"); |
| 49 | + destination_path |
| 50 | +} |
| 51 | + |
| 52 | +pub fn open_readonly_bitcoin_db_conn(base_dir: &PathBuf, _ctx: &Context) -> Result<DB, String> { |
| 53 | + let path = get_default_bitcoin_db_file_path(&base_dir); |
| 54 | + let opts = get_db_default_options(); |
| 55 | + let db = DB::open_for_read_only(&opts, path, false) |
| 56 | + .map_err(|e| format!("unable to open bitcoin.rocksdb: {}", e.to_string()))?; |
| 57 | + Ok(db) |
| 58 | +} |
| 59 | + |
| 60 | +pub fn open_readwrite_bitcoin_db_conn(base_dir: &PathBuf, _ctx: &Context) -> Result<DB, String> { |
| 61 | + let path = get_default_bitcoin_db_file_path(&base_dir); |
| 62 | + let opts = get_db_default_options(); |
| 63 | + let db = DB::open(&opts, path) |
| 64 | + .map_err(|e| format!("unable to open bitcoin.rocksdb: {}", e.to_string()))?; |
| 65 | + Ok(db) |
| 66 | +} |
| 67 | + |
| 68 | +fn get_block_key(block_identifier: &BlockIdentifier) -> [u8; 12] { |
| 69 | + let mut key = [0u8; 12]; |
| 70 | + key[..2].copy_from_slice(b"b:"); |
| 71 | + key[2..10].copy_from_slice(&block_identifier.index.to_be_bytes()); |
| 72 | + key[10..].copy_from_slice(b":d"); |
| 73 | + key |
| 74 | +} |
| 75 | + |
| 76 | +fn get_last_insert_key() -> [u8; 3] { |
| 77 | + *b"m:t" |
| 78 | +} |
| 79 | + |
| 80 | +pub fn insert_entry_in_stacks_blocks(block: &StacksBlockData, stacks_db_rw: &DB, _ctx: &Context) { |
| 81 | + let key = get_block_key(&block.block_identifier); |
| 82 | + let block_bytes = json!(block); |
| 83 | + stacks_db_rw |
| 84 | + .put(&key, &block_bytes.to_string().as_bytes()) |
| 85 | + .expect("unable to insert blocks"); |
| 86 | + stacks_db_rw |
| 87 | + .put( |
| 88 | + get_last_insert_key(), |
| 89 | + block.block_identifier.index.to_be_bytes(), |
| 90 | + ) |
| 91 | + .expect("unable to insert metadata"); |
| 92 | +} |
| 93 | + |
| 94 | +pub fn get_last_block_height_inserted(stacks_db: &DB, _ctx: &Context) -> Option<u64> { |
| 95 | + stacks_db |
| 96 | + .get(get_last_insert_key()) |
| 97 | + .unwrap_or(None) |
| 98 | + .and_then(|bytes| Some(u64::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3], bytes[4], bytes[5], bytes[6], bytes[7]]))) |
| 99 | +} |
| 100 | + |
| 101 | +pub fn insert_entries_in_stacks_blocks( |
| 102 | + blocks: &Vec<StacksBlockData>, |
| 103 | + stacks_db_rw: &DB, |
| 104 | + ctx: &Context, |
| 105 | +) { |
| 106 | + for block in blocks.iter() { |
| 107 | + insert_entry_in_stacks_blocks(block, stacks_db_rw, ctx); |
| 108 | + } |
| 109 | +} |
| 110 | + |
| 111 | +pub fn get_stacks_block_at_block_height( |
| 112 | + block_height: u64, |
| 113 | + retry: u8, |
| 114 | + stacks_db: &DB, |
| 115 | +) -> Result<Option<StacksBlockData>, String> { |
| 116 | + let mut attempt = 0; |
| 117 | + loop { |
| 118 | + match stacks_db.get(get_block_key(&BlockIdentifier { hash: "".to_string(), index: block_height })) { |
| 119 | + Ok(Some(entry)) => return Ok(Some({ |
| 120 | + let spec: StacksBlockData = serde_json::from_slice(&entry[..]) |
| 121 | + .map_err(|e| format!("unable to deserialize Stacks chainhook {}", e.to_string()))?; |
| 122 | + spec |
| 123 | + })), |
| 124 | + Ok(None) => return Ok(None), |
| 125 | + _ => { |
| 126 | + attempt += 1; |
| 127 | + std::thread::sleep(std::time::Duration::from_secs(2)); |
| 128 | + if attempt > retry { |
| 129 | + return Ok(None); // TODO |
| 130 | + } |
| 131 | + } |
| 132 | + } |
| 133 | + } |
| 134 | +} |
| 135 | + |
| 136 | +pub fn is_stacks_block_present( |
| 137 | + block_identifier: &BlockIdentifier, |
| 138 | + retry: u8, |
| 139 | + stacks_db: &DB, |
| 140 | +) -> bool { |
| 141 | + let mut attempt = 0; |
| 142 | + loop { |
| 143 | + match stacks_db.get(get_block_key(block_identifier)) { |
| 144 | + Ok(Some(_)) => return true, |
| 145 | + Ok(None) => return false, |
| 146 | + _ => { |
| 147 | + attempt += 1; |
| 148 | + std::thread::sleep(std::time::Duration::from_secs(2)); |
| 149 | + if attempt > retry { |
| 150 | + return false; |
| 151 | + } |
| 152 | + } |
| 153 | + } |
| 154 | + } |
| 155 | +} |
0 commit comments