diff --git a/sourced/ml/cmd/repos2bow.py b/sourced/ml/cmd/repos2bow.py index 76d671c1..01a7d931 100644 --- a/sourced/ml/cmd/repos2bow.py +++ b/sourced/ml/cmd/repos2bow.py @@ -96,7 +96,7 @@ def keymap(r): bags_writer \ .link(Repartitioner.maybe( args.partitions, keymap=lambda x: x[Uast2BagFeatures.Columns.document])) \ - .link(BOWWriter(document_indexer, df_model, bow, args.batch)) \ + .link(BOWWriter(document_indexer, df_model, bow, "repos2bow", args.batch)) \ .execute() bags.unpersist() pipeline_graph(args, log, root) diff --git a/sourced/ml/transformers/bow_writer.py b/sourced/ml/transformers/bow_writer.py index 4fc3ea18..2d116239 100644 --- a/sourced/ml/transformers/bow_writer.py +++ b/sourced/ml/transformers/bow_writer.py @@ -16,11 +16,12 @@ class BOWWriter(Transformer): DEFAULT_CHUNK_SIZE = 2 * 1000 * 1000 * 1000 def __init__(self, document_indexer: Indexer, df: OrderedDocumentFrequencies, - filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE, **kwargs): + filename: str, series: str, chunk_size: int = DEFAULT_CHUNK_SIZE, **kwargs): super().__init__(**kwargs) self.document_indexer = document_indexer self.df = df self.filename = filename + self.series = series self.chunk_size = chunk_size def __getstate__(self): @@ -79,7 +80,7 @@ def __call__(self, head: RDD): filename = self.get_bow_file_name(self.filename, i) BOW() \ .construct(docs, tokens, matrix) \ - .save(filename, deps=(self.df,)) + .save(filename, series=self.series, deps=(self.df,)) self._log.info("%d -> %s with %d documents, %d nnz (%s)", i + 1, filename, len(docs), size, humanize.naturalsize(os.path.getsize(filename)))