This document addresses the analysis of i2p-Tahoe-LAFS version 1.10 in order to implement three new features:
Quota management
Connection to multiple Helpers
Automatic spreading of Introducers & Helpers furls.
It's secure, resilient, encrypted and private. It works with GPG encryption keys.
You can recover again your data, even if your cube breaks down, with a backup your private GPG keys.
Our Tahoe version it's a i2p+tahoe version with CommunityCube improvements.
I2p-tahoe it's a regular tahoe with support to multiple introducers patch from:
https://tahoe-lafs.org/trac/
Where is written: "Foolscap & Tahoe-LAFS patched for use on I2P (with support for multiple introducers)"
We begin with a short introduction to Tahoe-LAFS, and then proceed to analyse the requirements for the 3 proposed features. The analysis includes a review of how related functionality is now implemented in Tahoe-LAFS, which files should be modified and what modifications should be included for each of those files.
As a short reminder, Tahoe-LAFS grid is composed of several types of nodes:
Introducer: keeps track of StorageServer nodes connected to the grid and publishes it so that StorageClients know which are the nodes they can connect to.
StorageServer: form the distributed data store.
HelperServer: a intermediate server which can be used to minimize upload time. Due to the redundancy introduced by erasure coding, uploading a file to the grid can be an order of magnitude slower than reading from it. The HelperServer acts as a proxy which receives the encrypted data from the StorageClient (encrypted, but with no redundancy), performs the erasure encoding and distributes files to StorageServers in the grid.
StorageClient: once they get the list of StorageServers in the grid from one introducer, they can connect to read and write data on the grid. Read operations are performed connecting directly to StorageServer nodes. Write operations can be performed connecting directly or using a HelperServer (only for immutable files as of Tahoe-LAFS 1.10.0).
For a full introduction to Tahoe-LAFS, see the docs folder on source tree. You can also check the tutorial published on Nilestore project's wiki.
Diagram showing tahoe-lafs network topology (from tahoe-lafs official documentation). (Notice that Introducers and Helpers are not shown in it)
Tahoe-LAFS is developed in Python (2.6.6 – 2.x), and has a great test code coverage (around 92% for 1.10). In this paragraph we make a short description of Tahoe-LAFS source code.
We start by looking at Tahoe-LAFS source folder structure:
allmydata ├── frontends ├── immutable │ └── downloader ├── introducer ├── mutable ├── scripts ├── storage ├── test ├── util ├── web │ └── static │ ├── css │ └── img └── windows |
As a general rule, code specific for each feature's Client and Server is placed under that feature's folder, as client.py and server.py. All test files are placed under test folder.
Some files relevant to the rest of the document:
allmydata/client.py: this is the main file for the Tahoe-LAFS, contains the Client class which initializes most of the services (StorageFarmBroker, StorageServer, Web/ftp/sFtp frontends, Helper...)
allmydata/introducer/server.py: the server side of the Introducer.
allmydata/introducer/client.py: the client side of the Introducer.
storage/server.py the server side of the storage.
allmydata/immutable/upload.py manages connections to the Helper from the client side.
allmydata/immutable/offloaded.py the Helper, server side
allmydata/storage_client.py functions related to the storage client.
All user's community cube data, it's backed up on Tahoe-LAFS + I2P.
It's included (configurable on Control panel): mail, owncloud data, friendica data, yacy data
It encrypts data and separate into pieces arround all Community Cube decentralized filesystem.
It's need to create a cron script to backup each day selected data.
Files are replicated, splitted and distributed over the network, to be resilient if some nodes fails.
Each file it’s splitted in K parts
Usually it’s said on Tahoe-LAFS (K-of-X file it’s splitted in X parts, but only you need K pieces to recover your file.)
You can calculate real available space, depending your replication factor, with
X = Total parts
K = Minimum parts to restore the file
TS = Total space
Real Available Space = TS / (X/K)
This way if you are sharing 100GB, and you are using default replication factor (3 of 10), you only have really available 30GB.
If we use Symform Raid-96TM specification, they are saying K=64 and X=96. It would be a 63GB available.
CommunityCube resilience factor will be determined and fixed in our Tahoe-LAFS version.
When you are not using the internet, like at night your DDSA filesystem client, should check that already exists X parts of your files.
If they not exist, your DDSA filesystem client should ask to already living nodes with your file to replicate itselfs.
What gives you access to your tahoe-lafs folder, it's a pair of GPG keys.
This keys are backed up on same folder with a script, in a encrypted file. User only has to remember encryption password and public URL to acces it.
In case that your CommunityCube server it's destroyed or stolen, when you start on the new CommunityCube server, you enter public URL and password. And recover script, downloads from your old folder your private keys, decrypt the file, delete your active keys, and recover your old keys.
In this way you can recover again all your data from the grid, including files, emails and databases.
Tahoe-LAFS will backup to I2P+TahoeLAFS grid storage following data from Community Cube services:
- YaCy: configuration
- Friendica: mysql, configuration & data
- Mailpile: configuration, emails & GPG keys
- WebRTC: nothing to backup
- OwnCloud: mysql, configuration & data
- TahoeLAFS: tahoe keys & config
- CommunityCube Panel: service states & panel configuration
- System service configurations: dhcp, squid, spamassasin, iptables, etc
Support for quota management ('accounting') in Tahoe-LAFS has been an ongoing development for several years. The schema being used is based on the use of accounts, which could be managed by a central AuthorityServer or independently by each of the StorageServers (this option being suited only for smaller grids). A detailed description of the intended accounting architecture and development roadmap can be found in the project's documentation.
The objective of quota management in CommunityCube is to ensure that a user which contributes to the grid with a given amount space can use the equivalent of that amount in it.
User accounts pose obvious risks regarding privacy/anonymity concerns. We have thus investigated a different approach to the problem: control quota management from the StorageClient itself.
This implementation comes, however, with its own set of drawbacks: it can be easily defeated by using a modified StorageClient and it requires to keep a local record of files stored in the grid or (something Tahoe-LAFS does not require as long as you keep a copy of the capabilities you are interested in), which is also a big threat from the privacy point of view. As an alternative to keeping a record of every uploaded file, users can be forced to use a single directory as root for all the files they upload (which is known as a rootcap). The content under that directory can be accounted with a call to 'tahoe deep-check --add-lease ALIAS:', where ALIAS stands for the alias of the rootcap directory.
This approach seems to be the most compatible with CommunityCube's objectives, and its adoption relies on the belief that CommunityCube's users will be 'fair-play' to the rest of the community members.
The proposed system can be easily bypassed by malicious actors, but it will however ensure that the grid is not abused due to user mistakes or lack of knowledge on the grid's working principles and capacity.
Quota management will be handled by the StorageClient, which imposes the limits on what can be uploaded to the grid. When a file is to be uploaded, the StorageClient:
Checks that the storage server is running and writable
Calculates the space it is sharing in the associated storage server.
Available disk space
Reserved disk space (minimum free space to be reserved)
Size of stored shares
The size of leases it holds on files stored on grid (requires a catalog of uploaded files and lease expiration/renewal tracking).
Estimates the assigned space as 'Sharing space (available + stored shares)'
Checks that Used space (i.e. sum of leases) is smaller than 'Sharing space'.
Retrieve the grid's “X out K” parameters used in erasure encoding.
Verifies that predicted used space and reports an error if the available quota is exceeded.
We will have a look at how the following functionality is implemented in Tahoe-LAFS:
The upload of a file (to the Helper or directly to other StorageServers via the StorageFarmBroker).
Check if the StorageServer is running.
The statistics associated with the space used and available on the StorageServer.
The moment the leases are renewed in remote StorageServers.
In the next paragraphs we show how the system works with the corresponding code.
The upload of a file
The upload takes place at different classes depending on the type of data being uploaded. For immutable files, it is the Uploader service, which is defined in allmydata/immutable/upload.py. For mutable files, it is defined in allmydata/mutable/filenode.py.
These functions can be accessed from the main client, using an intermediate call to a NodeMaker instance, or directly calling the uploader:
File: allmydata/client.py class Client(node.Node,pollmixin.PollMixin): (…) # these four methods are the primitives for creating filenodes and # dirnodes. The first takes a URI and produces a filenode or (new-style) # dirnode. The other three create brand-new filenodes/dirnodes. def create_node_from_uri(self,write_uri, read_uri= None , deep_immutable= False , name= "<unknown name>" ): # This returns synchronously. # Note that it does *not* validate the write_uri and read_uri; instead we # may get an opaque node if there were any problems. return self.nodemaker.create_from_cap(write_uri, read_uri, deep_immutable=deep_immutable, name=name) def create_dirnode(self, initial_children={}, version= None ): d = self.nodemaker.create_new_mutable_directory(initial_children, version=version) return d def create_immutable_dirnode(self, children, convergence= None ): return self.nodemaker.create_immutable_directory(children, convergence) def create_mutable_file (self, contents= None , keysize= None , version= None ): return self.nodemaker.create_mutable_file(contents, keysize, version=version) def upload(self,uploadable): uploader = self.getServiceNamed("uploader") return uploader.upload(uploadable) |
File: allmydata/nodemaker.py class NodeMaker: implements(INodeMaker) (…) def create_mutable_file(self, contents= None , keysize= None , version= None ): if version is None : version = self.mutable_file_default n = MutableFileNode(self.storage_broker, self.secret_holder, self.default_encoding_parameters,self.history) d = self.key_generator.generate(keysize) d.addCallback(n. create_with_keys , contents, version=version) d.addCallback( lambda res: n) return d def create_new_mutable_directory (self, initial_children={}, version= None : # initial_children must have metadata (i.e. {} instead of None) for (name,(node, metadata)) in initial_children.iteritems(): precondition(isinstance(metadata, dict), "create_new_mutable_directory requires metadata to be a dict, not None" , metadata) node.raise_error() d = self.create_mutable_file( lambda n: MutableData(pack_children(initial_children, n.get_writekey())), version=version) d.addCallback(self._create_dirnode) return d def create_immutable_directory(self, children, convergence= None ): if convergence is None : convergence = self.secret_holder.get_convergence_secret() packed = pack_children(children, None , deep_immutable= True ) uploadable = Data(packed, convergence) d = self.uploader.upload(uploadable) d.addCallback( lambda results: self .create_from_cap( None , results.get_uri())) d.addCallback(self._create_dirnode) return d |
File: allmydata/immutable/upload.py class Uploader (service.MultiService, log.PrefixingLogMixin): """I am a service that allows file uploading. I am a service-child of the Client. """ (...) def upload(self, uploadable): """ Returns a Deferred that will fire with the UploadResults instance. """ assert self.parent assert self.running uploadable = IUploadable(uploadable) d = uploadable.get_size() def _got_size(size): default_params = self.parent.get_encoding_parameters() precondition(isinstance(default_params, dict), default_params) precondition( "max_segment_size" in default_params, default_params) uploadable.set_default_encoding_parameters(default_params) if self.stats_provider: self.stats_provider.count( 'uploader.files_uploaded' , 1) self.stats_provider.count( 'uploader.bytes_uploaded' , size) if size <= self .URI_LIT_SIZE_THRESHOLD: uploader = LiteralUploader() return uploader.start(uploadable) else : eu = EncryptAnUploadable(uploadable, self._parentmsgid) d2 = defer.succeed( None ) storage_broker = self.parent.get_storage_broker() if self ._helper: uploader = AssistedUploader(self._helper, storage_broker) d2.addCallback( lambda x: eu.get_storage_index()) d2.addCallback( lambda si: uploader.start(eu, si)) else : storage_broker = self.parent.get_storage_broker() secret_holder = self.parent._secret_holder uploader = CHKUploader(storage_broker, secret_holder) d2.addCallback( lambda x: uploader.start(eu)) self._all_uploads[uploader] = None if self._history: self._history.add_upload(uploader.get_upload_status()) def turn_verifycap_into_read_cap (uploadresults): # Generate the uri from the verifycap plus the key. d3 = uploadable.get_encryption_key() def put_readcap_into_results(key): v = uri.from_string(uploadresults.get_verifycapstr()) r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size) uploadresults.set_uri(r.to_string()) return uploadresults d3.addCallback(put_readcap_into_results) return d3 d2.addCallback(turn_verifycap_into_read_cap) return d2 d.addCallback(_got_size) def _done (res): uploadable.close() return res d.addBoth(_done) return d |
We have highlighted the callback that will start the upload Upload._got_size and the three available ways to upload immutable content: with a LiteralUploader for small files, with a Helper or directly with the StorageFarmBroker.
In the case of mutable files, we have to check the moment when we upload a new file and when we want to modify it (of fully overwrite it via MutableFileNode.overwrite or MutableFileNode.update):
File: allmydata/mutable/filenode.py class MutableFileNode: implements(IMutableFileNode, ICheckable) def __init__(self, storage_broker, secret_holder, default_encoding_parameters, history): self._storage_broker = storage_broker self._secret_holder = secret_holder self._default_encoding_parameters = default_encoding_parameters self._history = history self._pubkey = None # filled in upon first read self._privkey = None # filled in if we're mutable # we keep track of the last encoding parameters that we use. These # are updated upon retrieve, and used by publish. If we publish # without ever reading (i.e. overwrite()), then we use these values. self._required_shares = default_encoding_parameters[ "k" ] self._total_shares = default_encoding_parameters[ "n" ] self._sharemap = {} # known shares, shnum-to-[nodeids] self._most_recent_size = None (...) def create_with_keys(self,(pubkey, privkey), contents, version=SDMF_VERSION): """Call this to create a brand-new mutable file. It will create the shares, find homes for them, and upload the initial contents (created with the same rules as IClient.create_mutable_file() ). Returns a Deferred that fires (with the MutableFileNode instance you should use)when it completes. """ self._pubkey,nself._privkey = pubkey, privkey pubkey_s = self._pubkey.serialize() privkey_s = self._privkey.serialize() self._writekey = hashutil.ssk_writekey_hash(privkey_s) self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s) self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) if version == MDMF_VERSION: self._uri = WriteableMDMFFileURI(self._writekey, self._fingerprint) self._protocol_version = version elif version == SDMF_VERSION: self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint) self._protocol_version = version self._readkey = self._uri.readkey self._storage_index = self._uri.storage_index initial_contents = self._get_initial_contents(contents) return self._upload(initial_contents, None ) (…) def overwrite(self, new_contents): """ I overwrite the contents of the best recoverable version of this mutable file with new_contents. This is equivalent to calling overwrite on the result of get_best_mutable_version with new_contents as an argument. I return a Deferred that eventually fires with the results of my replacement process. """ # TODO: Update downloader hints. return self._do_serialized(self._overwrite, new_contents) (…) def upload(self, new_contents, servermap): """ I overwrite the contents of the best recoverable version of this mutable file with new_contents, using servermap instead of creating/updating our own servermap. I return a Deferred that fires with the results of my upload. """ # TODO: Update downloader hints return self._do_serialized(self._upload, new_contents, servermap) def modify(self, modifier, backoffer= None ): """ I modify the contents of the best recoverable version of this mutablenfile with the modifier. This is equivalent to calling modify on the result of get_best_mutable_version. I return a Deferred that eventually fires with an UploadResults instance describing this process. """ # TODO: Update downloader hints. return self._do_serialized(self._modify, modifier, backoffer) |
In addition to the relevant functions we have also highlighted the values for k and n, which are required to estimate how much disk space will take a new file.
Check if the StorageServer is running.
The StorageServer is initialized in allmydata/client.py, in function Client.init_storage, according to configuration values.
File: allmydata/client.py class Client(node.Node, pollmixin.PollMixin): (…) def init_storage(self): # should we run a storage server (and publish it for others to use)? if not self.get_config("storage", "enabled" , True , boolean= True ): return readonly = self.get_config( "storage" , "readonly" , False , boolean= True ) storedir = os.path.join(self.basedir, self.STOREDIR) data = self.get_config( "storage" , "reserved_space" , None ) try : reserved = parse_abbreviated_size(data) except ValueError: log.msg( "[storage]reserved_space= contains unparseable value %s" % data) raise if reserved is None : reserved = 0 (…) ss = StorageServer(storedir, self.nodeid, reserved_space=reserved, discard_storage=discard, readonly_storage=readonly, stats_provider=self.stats_provider, expiration_enabled=expire, expiration_mode=mode, expiration_override_lease_duration=o_l_d, expiration_cutoff_date=cutoff_date, expiration_sharetypes=expiration_sharetypes) self.add_service(ss) d = self.when_tub_ready() # we can't do registerReference until the Tub is ready def _publish(res): furl_file = os.path.join(self.basedir, "private" , "storage.furl" ).encode(get_filesystem_encoding()) furl = self.tub.registerReference(ss, furlFile=furl_file) ann = { "anonymous-storage-FURL" : furl, "permutation-seed-base32" : self._init_permutation_seed(ss), } current_seqnum, current_nonce = self._sequencer() for ic in self .introducer_clients: ic.publish( "storage" , ann, current_seqnum, current_nonce, self._node_key) d.addCallback(_publish) d.addErrback(log.err, facility= "tahoe.init" , level=log.BAD, umid= "aLGBKw" ) |
To find out if the StorageServer is running we have to recover the parent of the service we are at (i.e. Uploader ). We will be working with services which are 'children' of main Client instance, and we can check if the client is running a given service (i.e. the storage service) as it is done in allmydata/web/root.py:
File: allmydata/web/root.py class Root(rend.Page): (...) def __init__(self, client, clock= None , now= None ): (...) try : s = client.getServiceNamed( "storage" ) except KeyError: s = None (...) |
The statistics associated with the space used and available on the StorageServer.
From the StorageServer service we get access to theStorageServer.get_stats function:
class StorageServer(service.MultiService, Referenceable): (…) def get_stats(self): # remember: RIStatsProvider requires that our return dict # contains numeric values. stats = { 'storage_server.allocated' : self.allocated_size(), } stats[ 'storage_server.reserved_space' ] = self.reserved_space for category,ld in self.get_latencies().items(): for name,v in ld.items(): stats[ 'storage_server.latencies.%s.%s' % (category, name)] = v try : disk = fileutil.get_disk_stats(self.sharedir,self.reserved_space) writeable = disk[ 'avail' ] > 0 # spacetime predictors should use disk_avail / (d(disk_used)/dt) stats[ 'storage_server.disk_total' ] = disk[ 'total' ] stats[ 'storage_server.disk_used' ] = disk[ 'used' ] stats[ 'storage_server.disk_free_for_root' = disk[ 'free_for_root' ] stats[ 'storage_server.disk_free_for_nonroot' ] = disk[ 'free_for_nonroot' ] stats[ 'storage_server.disk_avail' ] = disk[ 'avail' ] except AttributeError: writeable = True except EnvironmentError: log.msg( "OS call to get disk statistics failed" ,level=log.UNUSUAL) writeable = False if self.readonly_storage: stats[ 'storage_server.disk_avail' ] = 0 writeable = False stats[ 'storage_server.accepting_immutable_shares' ] = int(writeable) s = self.bucket_counter.get_state() bucket_count = s.get( "last-complete-bucket-count" ) if bucket_count: stats[ 'storage_server.total_bucket_count' ] = bucket_count return stats |
The leases held by the StorageClient, and their equivalent size on disk (i.e. the amount of storage we have spent).
Leases are created whenever we upload a new file, and they are renewed from the client at three points: in immutable/checker.py (lease renewal for immutable files), in mutable/servermap.py (called from mutable/checker.py, lease renewal for mutable files) and in scripts/tahoe_check.py (cli interface).
File: allmydata/immutable/checker.py classChecker(log.PrefixingLogMixin): """I query all servers to see if M uniquely-numbered shares are available. (…) def _get_buckets(self , s, storageindex): """Return a deferred that eventually fires with ({sharenum: bucket}, serverid, success). In case the server is disconnected or returns a Failure then it fires with ({}, serverid, False) (A server disconnecting or returning a Failure when we ask it for buckets is the same, for our purposes, as a server that says it has none, except that we want to track and report whether or not each server responded.)""" rref = s.get_rref() lease_seed = s.get_lease_seed() if self._add_lease: renew_secret = self._get_renewal_secret(lease_seed) cancel_secret = self._get_cancel_secret(lease_seed) d2 = rref.callRemote( "add_lease" , storageindex, renew_secret, cancel_secret) d2.addErrback( self._add_lease_failed, s.get_name(), storageindex) (...) |
File: allmydata/mutable/servermap.py class ServermapUpdater: def __init__(self, filenode, storage_broker, monitor, servermap, mode=MODE_READ, add_lease = False, update_range = None): """I update a servermap, locating a sufficient number of useful shares and remembering where they are located. """ (…) def_do_read(self, server, storage_index, shnums, readv): ss = server.get_rref() if self._add_lease: # send an add-lease message in parallel. The results are handled # separately. This is sent before the slot_readv() so that we can # be sure the add_lease is retired by the time slot_readv comes # back (this relies upon our knowledge that the server code for # add_lease is synchronous). renew_secret = self._node.get_renewal_secret(server) cancel_secret = self ._node.get_cancel_secret(server) d2 = ss.callRemote( "add_lease" , storage_index, renew_secret, cancel_secret) # we ignore success d2.addErrback( self . _add_lease_failed , server, storage_index) d = ss.callRemote( "slot_readv" , storage_index, shnums, readv) return d (...) |
File: allmydata/scripts/tahoe_check.py def check_location (options, where): stdout = options.stdout stderr = options.stderr nodeurl = options[ 'node-url' ] if not nodeurl.endswith( "/" ): nodeurl += "/" try : rootcap, path = get_alias(options.aliases, where, DEFAULT_ALIAS) except UnknownAliasError, e: e.display(stderr) return 1 if path == '/' : path = '' url = nodeurl + "uri/%s" % urllib.quote(rootcap) if path: url += "/" + escape_path(path) # todo: should it end with a slash? url += "?t=check&output=JSON" if options[ "verify" ]: url += "&verify=true" if options[ "repair" ]: url += "&repair=true" if options[ "add-lease" ]: url += "&add-lease=true" resp = do_http( "POST" , url) if resp.status != 200 : print >>stderr, format_http_error( "ERROR" , resp) return 1 jdata = resp.read() if options.get( "raw" ): stdout.write(jdata) stdout.write( "\n" ) return 0 data = simplejson.loads(jdata) |
File: allmydata/client.py
Introduce code in functions used to create new nodes to keep track of files uploaded to the grid. It may be required to move this accounting code down to the immutable/Uploader.upload function or the mutable/MutableFileNode.update/overwrite functions if they are called directly from other parts of Tahoe-LAFS (not exclusively from the client). Alternatively, if we are using the single rootcap strategy, force any new file to lie under the rootcap.
Create a new function in client that recovers the StorageServer service, access its usage statistics, the erasure encoding parameters and the statistics for uploaded files to estimate the remaining quota.
Files: immutable/checker.py,mutable/servermap.py,scripts/tahoe_check.py
Introduce accounting of the times a lease is renewed against the database of uploaded files (if we are creating a local database, this would not be required if we are using the single root dir).
File: allmydata/web/root.py
Add functionality to show the updated quota data.
File: allmydata/web/welcome.xhtml
Modify the template to show shared remaining/total quota information.
File: allmydata/test/test_client.py
Add tests to verify that new uploads are properly accounted in the uploads database (or that they lie under the rootcap dir)
File: allmydata/test/test_storage.py
Add tests to verify that new uploads are properly accounted in the uploads database (or that they lie under the rootcap dir)
File:docs/architecture.rst
Include a brief description of the quota management system implementation.
File: docs/quotas.rst
Create a new file under docs describing in detail the implemented quota system.
Helpers are used in Tahoe-LAFS to cope with the overhead factor imposed by erasure coding and the asymmetric bandwith of upload/download in ADSL connections. Uploading a file requires K/X (considering we use an X out of K storage scheme) more bandwith than the file size (and the corresponding download operation from the grid. Given these asymmetric bandwith requirements and upload/download channel capacities, the upload operation can be orders of magnitude slower than its corresponding download.
To help ease this problem, Helper nodes (assumed to have an uplink with greater capacity than the user's), receive the cyphertext directly from the StorageClient (i.e. files that have already been encrypted, but have not yet been segmented and erasure-coded), erasure-codes it and distributes the resulting shares to StorageServers. This way the size of data to be uploaded by the StorageClient is limited to the size of the file to be uploaded, with the overhead being handled by the Helper.
As of version 1.10, i2p-Tahoe-LAFS can only be configured to use a single helper server, which (if used) must be specified in tahoe.cfg. Allowing the StorageClient to choose among a list of available helpers will add flexibility to the network and allow the StorageClient to choose the least-loaded Helper at a given moment.
Instead of the single value now stored in tahoe.cfg, we need a list of Helpers and the possibility to select one of them from that list using a particular selection algorithm.
Allow for a variable number of helpers, statically contained in "BASEDIR/helpers."
Before sending a file to the helper
Check all the helpers to retrieve their statistics.
Choose the helper with best stats.
Send the cyphertext to the chosen Helper
When a new client is started, it recovers the helper.furl from section [client] in tahoe.cfg. Its value is then used to initialize the Uploader service, as seen below:
File: allmydata/client.py classClient(node.Node,pollmixin.PollMixin): (…) def init_client(self): helper_furl = self.get_config( "client" , "helper.furl" ,None) if helper_furl in ( "None" , "" ): helper_furl = None DEP = self.encoding_params DEP[ "k" ] = int(self.get_config( "client" , "shares.needed" , DEP[ "k" ])) DEP[ "n" ] = int(self.get_config( "client" , "shares.total" , DEP[ "n" ])) DEP[ "happy" ] = int(self.get_config( "client" , "shares.happy" , DEP[ "happy" ])) self.init_client_storage_broker() self.history = History(self.stats_provider) self.terminator = Terminator() self.terminator.setServiceParent(self) self.add_service(Uploader(helper_furl,self.stats_provider,self.history)) |
In the Uploader class, we find the code to initialize the helper connection and handle when the server's connection is set or lost and recover helper information:
File: allmydata/immutable/upload.py class Uploader (service.MultiService,log.PrefixingLogMixin): (...) def __init__(self, helper_furl= None, stats_provider=None, history = None): self._helper_furl = helper_furl self.stats_provider = stats_provider self._history = history self ._helper = None self ._all_uploads = weakref.WeakKeyDictionary() # for debugging log.PrefixingLogMixin.__init__( self, facility= "tahoe.immutable.upload" ) service.MultiService.__init__( self ) def startService ( self ): service.MultiService.startService( self ) if self ._helper_furl: self .parent.tub.connectTo( self ._helper_furl, self ._got_helper) def _got_helper ( self , helper): self .log( "got helper connection, getting versions" ) default = { "http://allmydata.org/tahoe/protocols/helper/v1" : {}, "application-version" : "unknown: no get_version()" , } d = add_version_to_remote_reference(helper, default) d.addCallback( self ._got_versioned_helper) def _got_versioned_helper ( self , helper): needed = "http://allmydata.org/tahoe/protocols/helper/v1" if needed not in helper.version: raise InsufficientVersionError(needed, helper.version) self ._helper = helper def _lost_helper ( self ): self ._helper = None def get_helper_info ( self ): # return a tuple of (helper_furl_or_None, connected_bool) return ( self ._helper_furl, bool( self ._helper)) |
Finally on the upload function, if the Helper connection is available, it is used, and the node's storage broker when not:
File: allmydata/immutable/upload.py class Uploader (service.MultiService, log.PrefixingLogMixin): (...) def upload(self, uploadable): """ Returns a Deferred that will fire with the UploadResults instance. """ assert self .parent assert self .running uploadable = IUploadable(uploadable) d = uploadable.get_size() def _got_size (size): default_params = self.parent.get_encoding_parameters() precondition(isinstance(default_params, dict), default_params) precondition( "max_segment_size" in default_params, default_params) uploadable.set_default_encoding_parameters(default_params) if self .stats_provider: self .stats_provider.count( 'uploader.files_uploaded' , 1 ) self.stats_provider.count( 'uploader.bytes_uploaded' , size) if size <= self .URI_LIT_SIZE_THRESHOLD: uploader = LiteralUploader() return uploader.start(uploadable) else : eu = EncryptAnUploadable(uploadable, self._parentmsgid) d2 = defer.succeed( None ) storage_broker = self.parent.get_storage_broker() if self ._helper: uploader = AssistedUploader(self._helper, storage_broker) d2.addCallback( lambda x: eu.get_storage_index()) d2.addCallback( lambda si: uploader.start(eu, si)) else : storage_broker = self.parent.get_storage_broker() secret_holder = self.parent._secret_holder uploader = CHKUploader(storage_broker, secret_holder) d2.addCallback( lambda x: uploader.start(eu)) self._all_uploads[uploader] = None if self._history: self._history.add_upload(uploader.get_upload_status() def turn_verifycap_into_read_cap (uploadresults): # Generate the uri from the verifycap plus the key. d3 = uploadable.get_encryption_key() def put_readcap_into_results (key): v = uri.from_string(uploadresults.get_verifycapstr()) r = uri.CHKFileURI(key, v.uri_extension_hash, v.needed_shares, v.total_shares, v.size) uploadresults.set_uri(r.to_string()) return uploadresults d3.addCallback(put_readcap_into_results) return d3 d2.addCallback(turn_verifycap_into_read_cap) return d2 d.addCallback(_got_size) def _done(res): uploadable.close() return res d.addBoth(_done) return d |
Rendering related to the uploader is made at the web interface:
File: allmydata/web/root.py class Root (rend.Page): def data_helper_furl_prefix ( self , ctx, data): try : uploader = self .client.getServiceNamed( "uploader" ) except KeyError: return None furl,connected = uploader.get_helper_info() if not furl: return None # trim off the secret swissnum (prefix, _, swissnum ) = furl.rpartition( "/" ) return "%s/[censored]" % (prefix,) def data_helper_description (xself , ctx, data): if self .data_connected_to_helper(ctx, data) == "no" : return "Helper not connected" return "Helper" def data_connected_to_helper (self , ctx, data): try : uploader = self.client.getServiceNamed( "uploader" ) except KeyError: return "no" # we don't even have an Uploader furl, connected = uploader.get_helper_info() if furl is None : return "not-configured" if connected: return "yes" return "no" |
These functions are accesed from the template welcome page which gets rendered by nevow:
File: allmydata/web/welcome.xhtml (…) <div> <h3> <div><n:attr name="class">status-indicator connected-<n:invisible n:render="string" n:data="connected_to_helper" /></n:attr></div> <div n:render="string" n:data="helper_description" /> </h3> <div class="furl" n:render="string" n:data="helper_furl_prefix" /> </div> (…) |
Tests are implemented in allmydata/test/test_helper.py
File: allmydata/test/test_helper.py class AssistedUpload (unittest.TestCase): (...) def setUpHelper ( self , basedir, helper_class=Helper_fake_upload): fileutil.make_dirs(basedir) self .helper = h = helper_class(basedir, self.s.storage_broker, self.s.secret_holder, None , None ) self.helper_furl = self.tub.registerReference(h) def test_one ( self ): self .basedir = "helper/AssistedUpload/test_one" self.setUpHelper(self.basedir) u = upload.Uploader(self.helper_furl) u.setServiceParent(self.s) d = wait_a_few_turns() def _ready (res): assert u._helper return upload_data(u, DATA, convergence = "some convergence string" ) d.addCallback(_ready) (…) def test_previous_upload_failed ( self ): (...) f = open(encfile, "wb" ) f.write(encryptor.process(DATA)) f.close() u = upload.Uploader(self.helper_furl) u.setServiceParent(self.s) d = wait_a_few_turns() def _ready (res): assert u._helper return upload_data(u, DATA, convergence = "test convergence string" ) d.addCallback(_ready) (…) def test_already_uploaded ( self ): self .basedir = "helper/AssistedUpload/test_already_uploaded" self.setUpHelper(self.basedir, helper_class=Helper_already_uploaded) u = upload.Uploader(self.helper_furl) u.setServiceParent(self.s) d = wait_a_few_turns() |
File: allmydata/client.py
Add MULTI_HELPERS_CFG var with the path to helpers file
Create a init_upload_helpers_list to parse the file and return the list of furls (also must take into account helper.furl in tahoe.cfg for compatibility options).
Update init_client.py to call init_upload_helpers_list. Refactor code to read and write from the multiple introducers list to get a generic 'list of furls' manager that can be shared by multiple introducers and the multiple helpers initialization code. This refactoring will also be useful for feature number 3, spreading servers, given that both lists will be updated with a similar mechanism.
Eventually rename init_helper to init_helper_server.
File: allmydata/immutable/upload.py
Refactor Uploader:
Create a wrapper class to handle connections with remote helper servers using functions _got_helper, _got_versioned_helper, _lost_helper, get_helper_info from Upload class.
Create a list of available helpers from the helpers list passed during initialization.
Create a hook function to select which server to use for uploading:
Choose the best helper server to upload based on the availability of helper servers and their statistics.
Fallback to standard broker if no helper is available.
File: allmydata/web/root.py / allmydata/web/welcome.xhtml
Modify functions Root.data_helper_furl_prefix, Root.data_helper_description and Root.data_connected_to_helper and the nevow template to accommodate to a list of helpers instead of a single helper available. (See patch for Tahoe-LAFS issue #1010 for those two files)
File: allmydata/test/test_helper.py
Add several fake fake uploaders to the file, verify that the selection works fine according to (fake) server statistics.
New file: allmydata/test/test_multi_helpers.py
New test file to check that the client parses properly the list of multiple helpers and that the Uploader is also properly initialized. (see allmydata/test/test_multi_introducers.py for reference).
Described the changes implemented in the following files:
docs/architecture.rst.
docs/configuration.rst.
docs/helper.rst.
Patches for similar functionality have already been published into Tahoe-LAFS repository. They can be used as a guide for implementation details:
Support of multiple introducers: provides a sample of how to move from a single introducer to a list of introducers .
Hook in server selection when choosing a remote StorageServer: sample of how we can implement a programmable hook to choose the target server in a generic way.
Version 1.10 of Tahoe-LAFS allows to specify a list of multiple introducers. However, this list is static, specified per installation in the BASEDIR/introducers file (thanks to the multiintroducers-path used in i2p-Tahoe-LAFS), given that the introducer only publishes a list of available StorageServers and not of available Introducers. This also applies for the list of Helpers once the multi-helpers modification be implemented.
Proposed feature consists of:
a) publishing a list of known Introducers that will be used to update the StorageClient's list of introducers.
b) publish a list of known helpers that will be used to update the StorageClient's list of helpers.
Configuration in tahoe.cfg will be used to indicate that:
In StorageClients:
If we want or not to get the list of Introducers to be updated automatically.
If we want or not to get the list of Helpers to be updated automatically.
In Helper nodes:
If we want the furl of the Helper node to be published via the introducer.
In Introducer nodes:
If we want the list of alternative introducers at BASEDIR/introducersfurl to be published.
We will use existing Introducer infrastructure to publish the furls of Helpers and Introducers.
Required functionality:
A StorageClient can subscribe to notifications of 'introducer' and 'helper' services, in addition to the 'storage' service to which it subscribes now.
The StorageClient will update the BASEDIR/helpers or BASEDIR/introducers file according to the data received from the Introducer.
A Helper can publish its furl via an Introducer, which will distribute it to connected StorageClients.
An Introducer can publish a list of alternative Introducers to the StorageClients that are connected to it. The list distributed is that stored in the BASEDIR/introducers file.
We analyse functionality related to the modifications listed above:
The initialization of the introducers list from the configuration file
The connection of the StorageClient to the IntroducerServer (using its IntroducerClient), and how it publishes its furl and subscribes to receive the furls of other StorageServers.
The initialization of a Helper server.
The initialization of an Introducer server.
Below we can find the code that initializes the list of introducers in the allmydata/client.py:
File: allmydata/client.py class Client(node.Node, pollmixin.PollMixin): (…) def __init__ ( self ,basedir= "." ): node.Node.__init__(self, basedir) self.started_timestamp = time.time() self.logSource= "Client" self.encoding_params = self.DEFAULT_ENCODING_PARAMETERS.copy() self.init_introducer_clients() self.init_stats_provider() self.init_secrets() self.init_node_key() self.init_storage() (…) def init_introducer_clients ( self ): self.introducer_furls = [] self.warn_flag = False # Try to load ""BASEDIR/introducers" cfg file cfg = os.path.join(self.basedir, MULTI_INTRODUCERS_CFG) if os.path.exists(cfg): f = open(cfg, 'r' ) for introducer_furl in f.read().split( '\n' ): introducers_furl = introducer_furl.strip() if introducers_furl.startswith( '#' ) or not introducers_furl: continue self.introducer_furls.append(introducer_furl) f.close() furl_count = len(self.introducer_furls) #print "@icfg: furls: %d" %furl_count # read furl from tahoe.cfg ifurl = self .get_config( "client" , "introducer.furl" , None ) if ifurl and ifurl not in self .introducer_furls: self.introducer_furls.append(ifurl) f = open(cfg, 'a' ) f.write(ifurl) f.write( '\n' ) f.close() if furl_count > 1 : self .warn_flag = True self.log( "introducers config file modified." ) print "Warning! introducers config file modified." # create a pool of introducer_clients self.introducer_clients = [] |
The first block highlighted in init_introducer_clients tries to read the BASEDIR/introducers file, the second adds helper.furl from tahoe.cfg if it was not contained in BASEDIR/introducers.
The second functionality that we are interested in using is the existing introducer infrastructure to update the lists of Introducers and Helpers. Below we find the relevant code used to subscribe the StorageFarmBroker (responsible of keeping in touch with the StorageServers in the grid) to the Introducer's 'storage' announcements (as an example of how we will have to publish the corresponding “helper” and “introducer” announcements:
File: allmydata/storage_client.py class StorageFarmBroker : implements(IStorageBroker) """I live on the client, and know about storage servers. For each server that is participating in a grid, I either maintain a connection to it or remember enough information to establish a connection to it on demand. I'm also responsible for subscribing to the IntroducerClient to find out about new servers as they are announced by the Introducer. """ (...) def use_introducer ( self , introducer_client): self.introducer_client = ic = introducer_client ic.subscribe_to( "storage" ,self._got_announcement) def _got_announcement (self , key_s, ann): if key_s is not None : precondition(isinstance(key_s, str), key_s) precondition(key_s.startswith( "v0-" ), key_s) assert ann[ "service-name" ] == "storage" s = NativeStorageServer(key_s, ann) serverid = s.get_serverid() old = self.servers.get(serverid) if old: if old.get_announcement() == ann: return # duplicate # replacement del self .servers[serverid] old.stop_connecting() # now we forget about them and start using the new one self.servers[serverid] = s s.start_connecting(self.tub, self._trigger_connections) # the descriptor will manage their own Reconnector, and each time we # need servers, we'll ask them if they're connected or not. def _trigger_connections ( self ): # when one connection is established, reset the timers on all others, # to trigger a reconnection attempt in one second. This is intended # to accelerate server connections when we've been offline for a # while. The goal is to avoid hanging out for a long time with # connections to only a subset of the servers, which would increase # the chances that we'll put shares in weird places (and not update (...) |
Function StorageFarmBroker.use_introducer subscribes to the 'storage' announcements with callback StorageFarmBroker._got_announcement, which tries to establish a connection with the new server whenever it receives the announcement.
During the StorageServer initialization, the announcement that this server is active is published when the connection with the introducer is ready (with the call to ic.publish):
File: allmydata/client.py class Client (node.Node, pollmixin.PollMixin): implements(IStatsProducer) (…) def init_storage ( self ): # should we run a storage server (and publish it for others to use)? if not self .get_config( "storage" , "enabled" , True , boolean = True ): return readonly = self .get_config( "storage" , "readonly" , False , boolean = True ) storedir = os.path.join(self.basedir, self.STOREDIR) (..) ss = StorageServer(storedir, self.nodeid, reserved_space=reserved, discard_storage=discard, readonly_storage=readonly, stats_provider=self.stats_provider, expiration_enabled=expire, expiration_mode=mode, expiration_override_lease_duration=o_l_d, expiration_cutoff_date=cutoff_date, expiration_sharetypes=expiration_sharetypes) self.add_service(ss) d = self.when_tub_ready() # we can't do registerReference until the Tub is ready def _publish (res): furl_file = os.path.join( self .basedir, "private" , "storage.furl" ).encode(get_filesystem_encoding()) furl = self.tub.registerReference(ss, furlFile=furl_file) ann = { "anonymous-storage-FURL" : furl, "permutation-seed-base32" : self ._init_permutation_seed(ss), } current_seqnum, current_nonce = self._sequencer() for ic in self.introducer_clients: ic.publish( "storage" , ann, current_seqnum, current_nonce, self ._node_key) d.addCallback(_publish) d.addErrback(log.err, facility = "tahoe.init" , level=log.BAD, umid = "aLGBKw" ) |
To publish the address of a Helper node, we will have to do it after its creation and registration in Client.init_helper (which is the function that initializes the Helper server):
File: allmydata/client.py class Client (node.Node, pollmixin.PollMixin): implements(IStatsProducer) (…) def init_helper ( self ): d = self.when_tub_ready() def _publish ( self ): self.helperb= Helper(os.path.join( self.basedir, "helper" ), self.storage_broker,self._secret_holder, self.stats_provider, self.history) # TODO: this is confusing. BASEDIR/private/helper.furl is created # by the helper. BASEDIR/helper.furl is consumed by the client # wants to use the helper. I like having the filename be the # same, since that makes 'cp' work smoothly, but the difference # between config inputs and generated outputs is hard to see. helper_furlfile = os.path.join(self.basedir, "private" , "helper.furl" ).encode(get_filesystem_encoding()) self.tub.registerReference(self.helper, furlFile=helper_furlfile) d.addCallback(_publish) d.addErrback(log.err, facility = "tahoe.init" , level=log.BAD, umid= "K0mW5w" ) |
A parameter in the config file for the helper server will tell wether or not to we should publish the helper's address via the introducer.
Regarding the publication of the updated list of Introducers, an IntroducerServer is not connected to another Introducer; however, it can publish a list of introducers which is initially preloaded at BASEDIR/introducers (same file that would be used by a standard node). We will only have to the code for initialization of the Introducer at allmydata/introducer/server.py, parse the introducers file and publish their announcements with a call to IntroducerNode.publish. (Notice that highlighted _publish function means 'publish this furl to the corresponding tub', i.e. make this furl accesible from the outside. From there we have to issue a call to IntroducerService to publish corresponding information. We may have to connect to every introducer on the list to verify they are on and recover additional information about them.
File: allmydata/introducer/server.py class IntroducerNode (node.Node): PORTNUMFILE = "introducer.port" NODETYPE = "introducer" GENERATED_FILES = [ 'introducer.furl' ] def __init__ ( self , basedir = "." ): node.Node.__init__(self, basedir) self.read_config() self.init_introducer() webport = self .get_config( "node" , "web.port" , None ) if webport: self .init_web(webport) # strports string def init_introducer ( self ): introducerservice = IntroducerService(self.basedir) self.add_service(introducerservice) old_public_fn = os.path.join( self .basedir, "introducer.furl" ).encode(get_filesystem_encoding()) private_fn = os.path.join( self .basedir, "private" , "introducer.furl" ).encode(get_filesystem_encoding()) (…) d = self .when_tub_ready() def _publish (res): furl = self.tub.registerReference(introducerservice, furlFile=private_fn) self .log( "introducer is at %s" % furl, umid= "qF2L9A" ) self .introducer_url = furl # for tests d.addCallback(_publish) d.addErrback(log.err, facility= "tahoe.init" , level=log.BAD, umid= "UaNs9A" ) (…) class IntroducerService (service.MultiService, Referenceable): implements(RIIntroducerPublisherAndSubscriberService_v2) (…) def publish ( self , ann_t, canary, lp): try : self._publish(ann_t, canary, lp) except : log.err(format= "Introducer.remote_publish failed on %(ann)s" , ann=ann_t, level=log.UNUSUAL, parent=lp, umid = "620rWA" ) raise (…) |
File: allmydata/client.py
StorageClient:
Subscribe to the Introducer's 'helper' and 'introducer' announcements, possibly within a new Client.init_subscriptions function.
Create the callback function to handle each of both suscriptions and update BASEDIR/helpers and BASEDIR/inroducers accordingly.
HelperServer
After initialization of the server on Client.init_helper, publish the corresponding furl in the introducer with a 'helper' announcement
File: allmydata/introducer/server.py
IntroducerServer
During initialization, read the list of alternative Introducers from BASEDIR/inroducers.
Once the IntroducerService is active, publish the furl of every alternative introducer known to this Introducer instance.
No modifications are needed in the Gui.
File: allmydata/test/test_introducer.py
Class Client: add test cases to verify:
That the client processes properly the new 'helper' and 'introducer' announcements.
That the client updates BASEDIR/helpers and BASEDIR/introducers properly.
That the introducer publishes the alternative list of Introducers according to configuration in tahoe.cfg.
That when a client is configured as HelperServer it publishes its furl via the introducer according to configuration in tahoe.cfg.
Described the changes implemented in the following files:
docs/architecture.rst: add reference to automatic update of DIRBASE/introducers and DIRBASE/helpers
docs/configuration.rst: describe new options for StorageClients (auto_update_introducers, auto_update_helpers), for HelperServer (publish_helper_furl) and for IntroducerServer (publish_alternative_introducers)
docs/helper.rst: describe new configuration options.
An important development to keep a network clean from sybil and malicious attacs it's a way to blacklist and detect malicious storage nodes
A malicious storage node it's defined like a node that it's saying that it's storing data, but it isn't.
Those parts of files that it's reciving, are lost.
To prevent malicious storage nodes, each client will check file integrity to be sure files are accessible.
Client keeps a relation of parts uploaded to each node. When a storage node where file were alocated there, is not giving the file, or it's answering it doesn't have your file or is not online, after few retries in different days this node it's marked as malicious.
Each client will have a list of what he detected as a malicious storage node.
Those list can be done in the introducer,too. In this way, introducer would prevent to notice about a malicious storage node to possible clients.
A malicious client it's defined as a computer that's trying to upload more space than he's sharing.
To prevent this, there are mainly 2 approaches.
One approach it's to create a public blockchain with all identities. In this way each person would know how many storage has each client. And can be restricted, in a easy way. This implementation can be quite hard to implement, but it's the best approach to solve an attack from clients.
Other approach it that storage nodes store which files are stored by each client ID. And they talk to keep informed how much space does each client has already used, and block if space it's more than allowed.
Allowed space always it's in relation with how much it's sharing.
CrashPlan & Symform (FileSystem) | I2P + Tahoe-LAFS |
---|---|
Distributed decentralized data | X |
Encrypted before transmitting | X |
No file size limits | X |
Manage password & encryption keys | |
Pause backups on low battery | |
Pause backups over selected network interfaces | |
Pause backups over selected wi-fi networks | |
Sync on a inactivity period – configurable | bash scripting |
Do not produce bandwidth bottlenecks | |
Connection through Proxy | |
Not enumerating IP | X |
Resilence | X |
Storage Balancing | X |
Sumarized volume | |
Anonymous | X |
Sybil Attack protection | |
User Disk Quota |
Development | Hours: | Cost: |
---|---|---|
* User quota | 160h | 6400€ |
* Multiple Helpers | 60h | 2400€ |
* Spreading servers | 40h | 1600€ |
* Sybil atack: Black/whitelist Storage nodes | 60h | 2400€ |
* Sybil atack: Black/whitelist Introducer nodes | 40h | 1600€ |
* Tahoe Blockchain | 200h | 8000€ |