zuul-registry/zuul_registry/filesystem.py
Ian Wienand 7100c360b3 filesystem: add file syncs
The "does the size on disk match the manifest" size checks in
I70a7bb5f73d1dddc540e96529784bb8c9bb0b9e3 appear to having a
false-positive on some changes.

One example job I found failed with

 Manifest has invalid size for layer
 sha256:9815a275e5d0f93566302aeb58a49bf71b121debe1a291cf0f64278fe97ec9b5
 (size:203129434 actual:185016320)

but when I went to look at the file on disk (quite some time later),
it was correct

 $ ls -l ./_local/blobs/sha256:9815a275e5d0f93566302aeb58a49bf71b121debe1a291cf0f64278fe97ec9b5
 -rw-r--r-- 1 root root 203129433 Sep  7 01:53 data

(well, off by one byte, but that's the problem worked-around by the
original change).

I think what happened here is that is that when we're getting into
multi-hundred megabyte layers there's plenty of chance for the writes
to join the upload chunks to have not flushed by the time we check it
when the manifest is uploaded.

Add some flush and sync points after upload and concatentation writes
to avoid this.

Change-Id: I46606287d41fa6745de7c8bfb31c6b0d28e32957
2021-09-07 14:12:34 +10:00

130 lines
4.4 KiB
Python

# Copyright 2019 Red Hat, Inc.
#
# This module is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
import os
from . import storageutils
DISK_CHUNK_SIZE = 64 * 1024
class FilesystemDriver(storageutils.StorageDriver):
def __init__(self, conf):
self.root = conf['root']
def list_objects(self, path):
path = os.path.join(self.root, path)
if not os.path.isdir(path):
return []
ret = []
for f in os.listdir(path):
obj_path = os.path.join(path, f)
ret.append(storageutils.ObjectInfo(
obj_path, f, os.stat(obj_path).st_ctime,
os.path.isdir(obj_path)))
return ret
def get_object_size(self, path):
path = os.path.join(self.root, path)
if not os.path.exists(path):
return None
return os.stat(path).st_size
def put_object(self, path, data, uuid=None):
path = os.path.join(self.root, path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wb') as f:
if isinstance(data, bytes):
f.write(data)
else:
for chunk in data:
f.write(chunk)
f.flush()
os.fsync(f.fileno())
def get_object(self, path):
path = os.path.join(self.root, path)
if not os.path.exists(path):
return None
with open(path, 'rb') as f:
return f.read()
def stream_object(self, path):
path = os.path.join(self.root, path)
if not os.path.exists(path):
return None, None
f = open(path, 'rb', buffering=DISK_CHUNK_SIZE)
try:
size = os.fstat(f.fileno()).st_size
except OSError:
f.close()
raise
def data_iter(f=f):
with f:
yield b'' # will get discarded; see note below
yield from iter(lambda: f.read(DISK_CHUNK_SIZE), b'')
ret = data_iter()
# This looks a little funny, because it is. We're going to discard the
# empty bytes added at the start, but that's not the important part.
# We want to ensure that
#
# 1. the generator has started executing and
# 2. it left off *inside the with block*
#
# This ensures that when the generator gets cleaned up (either because
# everything went according to plan and the generator exited cleanly
# *or* there was an error which eventually raised a GeneratorExit),
# the file we opened will get closed.
next(ret)
return size, ret
def delete_object(self, path):
path = os.path.join(self.root, path)
if os.path.exists(path):
if os.path.isdir(path):
os.rmdir(path)
else:
os.unlink(path)
def move_object(self, src_path, dst_path, uuid=None):
src_path = os.path.join(self.root, src_path)
dst_path = os.path.join(self.root, dst_path)
os.makedirs(os.path.dirname(dst_path), exist_ok=True)
os.rename(src_path, dst_path)
def cat_objects(self, path, chunks, uuid=None):
path = os.path.join(self.root, path)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, 'wb') as outf:
for chunk in chunks:
chunk_path = os.path.join(self.root, chunk['path'])
with open(chunk_path, 'rb') as inf:
while True:
d = inf.read(4096)
if not d:
break
outf.write(d)
outf.flush()
os.fsync(outf.fileno())
for chunk in chunks:
chunk_path = os.path.join(self.root, chunk['path'])
os.unlink(chunk_path)
Driver = FilesystemDriver